12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613 |
- # Copyright 2020 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import datetime
- import http.client as http_client
- import json
- import os
- import urllib
- import mock
- import pytest # type: ignore
- from google.auth import _helpers, external_account
- from google.auth import exceptions
- from google.auth import identity_pool
- from google.auth import metrics
- from google.auth import transport
- from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
- CLIENT_ID = "username"
- CLIENT_SECRET = "password"
- # Base64 encoding of "username:password".
- BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
- SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
- SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = (
- "https://us-east1-iamcredentials.googleapis.com"
- )
- SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
- SERVICE_ACCOUNT_EMAIL
- )
- SERVICE_ACCOUNT_IMPERSONATION_URL = (
- SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
- )
- QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
- SCOPES = ["scope1", "scope2"]
- import yatest.common as yc
- DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
- SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
- SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
- SUBJECT_TOKEN_FIELD_NAME = "access_token"
- with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
- TEXT_FILE_SUBJECT_TOKEN = fh.read()
- with open(SUBJECT_TOKEN_JSON_FILE) as fh:
- JSON_FILE_CONTENT = json.load(fh)
- JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
- TOKEN_URL = "https://sts.googleapis.com/v1/token"
- TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
- SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
- AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
- WORKFORCE_AUDIENCE = (
- "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
- )
- WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
- WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
- VALID_TOKEN_URLS = [
- "https://sts.googleapis.com",
- "https://us-east-1.sts.googleapis.com",
- "https://US-EAST-1.sts.googleapis.com",
- "https://sts.us-east-1.googleapis.com",
- "https://sts.US-WEST-1.googleapis.com",
- "https://us-east-1-sts.googleapis.com",
- "https://US-WEST-1-sts.googleapis.com",
- "https://us-west-1-sts.googleapis.com/path?query",
- "https://sts-us-east-1.p.googleapis.com",
- ]
- INVALID_TOKEN_URLS = [
- "https://iamcredentials.googleapis.com",
- "sts.googleapis.com",
- "https://",
- "http://sts.googleapis.com",
- "https://st.s.googleapis.com",
- "https://us-eas\t-1.sts.googleapis.com",
- "https:/us-east-1.sts.googleapis.com",
- "https://US-WE/ST-1-sts.googleapis.com",
- "https://sts-us-east-1.googleapis.com",
- "https://sts-US-WEST-1.googleapis.com",
- "testhttps://us-east-1.sts.googleapis.com",
- "https://us-east-1.sts.googleapis.comevil.com",
- "https://us-east-1.us-east-1.sts.googleapis.com",
- "https://us-ea.s.t.sts.googleapis.com",
- "https://sts.googleapis.comevil.com",
- "hhttps://us-east-1.sts.googleapis.com",
- "https://us- -1.sts.googleapis.com",
- "https://-sts.googleapis.com",
- "https://us-east-1.sts.googleapis.com.evil.com",
- "https://sts.pgoogleapis.com",
- "https://p.googleapis.com",
- "https://sts.p.com",
- "http://sts.p.googleapis.com",
- "https://xyz-sts.p.googleapis.com",
- "https://sts-xyz.123.p.googleapis.com",
- "https://sts-xyz.p1.googleapis.com",
- "https://sts-xyz.p.foo.com",
- "https://sts-xyz.p.foo.googleapis.com",
- ]
- VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
- "https://iamcredentials.googleapis.com",
- "https://us-east-1.iamcredentials.googleapis.com",
- "https://US-EAST-1.iamcredentials.googleapis.com",
- "https://iamcredentials.us-east-1.googleapis.com",
- "https://iamcredentials.US-WEST-1.googleapis.com",
- "https://us-east-1-iamcredentials.googleapis.com",
- "https://US-WEST-1-iamcredentials.googleapis.com",
- "https://us-west-1-iamcredentials.googleapis.com/path?query",
- "https://iamcredentials-us-east-1.p.googleapis.com",
- ]
- INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
- "https://sts.googleapis.com",
- "iamcredentials.googleapis.com",
- "https://",
- "http://iamcredentials.googleapis.com",
- "https://iamcre.dentials.googleapis.com",
- "https://us-eas\t-1.iamcredentials.googleapis.com",
- "https:/us-east-1.iamcredentials.googleapis.com",
- "https://US-WE/ST-1-iamcredentials.googleapis.com",
- "https://iamcredentials-us-east-1.googleapis.com",
- "https://iamcredentials-US-WEST-1.googleapis.com",
- "testhttps://us-east-1.iamcredentials.googleapis.com",
- "https://us-east-1.iamcredentials.googleapis.comevil.com",
- "https://us-east-1.us-east-1.iamcredentials.googleapis.com",
- "https://us-ea.s.t.iamcredentials.googleapis.com",
- "https://iamcredentials.googleapis.comevil.com",
- "hhttps://us-east-1.iamcredentials.googleapis.com",
- "https://us- -1.iamcredentials.googleapis.com",
- "https://-iamcredentials.googleapis.com",
- "https://us-east-1.iamcredentials.googleapis.com.evil.com",
- "https://iamcredentials.pgoogleapis.com",
- "https://p.googleapis.com",
- "https://iamcredentials.p.com",
- "http://iamcredentials.p.googleapis.com",
- "https://xyz-iamcredentials.p.googleapis.com",
- "https://iamcredentials-xyz.123.p.googleapis.com",
- "https://iamcredentials-xyz.p1.googleapis.com",
- "https://iamcredentials-xyz.p.foo.com",
- "https://iamcredentials-xyz.p.foo.googleapis.com",
- ]
- class TestSubjectTokenSupplier(identity_pool.SubjectTokenSupplier):
- def __init__(
- self, subject_token=None, subject_token_exception=None, expected_context=None
- ):
- self._subject_token = subject_token
- self._subject_token_exception = subject_token_exception
- self._expected_context = expected_context
- def get_subject_token(self, context, request):
- if self._expected_context is not None:
- assert self._expected_context == context
- if self._subject_token_exception is not None:
- raise self._subject_token_exception
- return self._subject_token
- class TestCredentials(object):
- CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
- CREDENTIAL_SOURCE_JSON = {
- "file": SUBJECT_TOKEN_JSON_FILE,
- "format": {"type": "json", "subject_token_field_name": "access_token"},
- }
- CREDENTIAL_URL = "http://fakeurl.com"
- CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
- CREDENTIAL_SOURCE_JSON_URL = {
- "url": CREDENTIAL_URL,
- "format": {"type": "json", "subject_token_field_name": "access_token"},
- }
- CREDENTIAL_SOURCE_CERTIFICATE = {
- "certificate": {"use_default_certificate_config": "true"}
- }
- CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT = {
- "certificate": {"certificate_config_location": "path/to/config"}
- }
- SUCCESS_RESPONSE = {
- "access_token": "ACCESS_TOKEN",
- "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "token_type": "Bearer",
- "expires_in": 3600,
- "scope": " ".join(SCOPES),
- }
- @classmethod
- def make_mock_response(cls, status, data):
- response = mock.create_autospec(transport.Response, instance=True)
- response.status = status
- if isinstance(data, dict):
- response.data = json.dumps(data).encode("utf-8")
- else:
- response.data = data
- return response
- @classmethod
- def make_mock_request(
- cls, token_status=http_client.OK, token_data=None, *extra_requests
- ):
- responses = []
- responses.append(cls.make_mock_response(token_status, token_data))
- while len(extra_requests) > 0:
- # If service account impersonation is requested, mock the expected response.
- status, data, extra_requests = (
- extra_requests[0],
- extra_requests[1],
- extra_requests[2:],
- )
- responses.append(cls.make_mock_response(status, data))
- request = mock.create_autospec(transport.Request)
- request.side_effect = responses
- return request
- @classmethod
- def assert_credential_request_kwargs(
- cls, request_kwargs, headers, url=CREDENTIAL_URL
- ):
- assert request_kwargs["url"] == url
- assert request_kwargs["method"] == "GET"
- assert request_kwargs["headers"] == headers
- assert request_kwargs.get("body", None) is None
- @classmethod
- def assert_token_request_kwargs(
- cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
- ):
- assert request_kwargs["url"] == token_url
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
- assert len(body_tuples) == len(request_data.keys())
- for (k, v) in body_tuples:
- assert v.decode("utf-8") == request_data[k.decode("utf-8")]
- @classmethod
- def assert_impersonation_request_kwargs(
- cls,
- request_kwargs,
- headers,
- request_data,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- ):
- assert request_kwargs["url"] == service_account_impersonation_url
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_json = json.loads(request_kwargs["body"].decode("utf-8"))
- assert body_json == request_data
- @classmethod
- def assert_underlying_credentials_refresh(
- cls,
- credentials,
- audience,
- subject_token,
- subject_token_type,
- token_url,
- service_account_impersonation_url=None,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=None,
- credential_data=None,
- scopes=None,
- default_scopes=None,
- workforce_pool_user_project=None,
- ):
- """Utility to assert that a credentials are initialized with the expected
- attributes by calling refresh functionality and confirming response matches
- expected one and that the underlying requests were populated with the
- expected parameters.
- """
- # STS token exchange request/response.
- token_response = cls.SUCCESS_RESPONSE.copy()
- token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
- if basic_auth_encoding:
- token_headers["Authorization"] = "Basic " + basic_auth_encoding
- metrics_options = {}
- if credentials._service_account_impersonation_url:
- metrics_options["sa-impersonation"] = "true"
- else:
- metrics_options["sa-impersonation"] = "false"
- metrics_options["config-lifetime"] = "false"
- if credentials._credential_source:
- if credentials._credential_source_file:
- metrics_options["source"] = "file"
- else:
- metrics_options["source"] = "url"
- else:
- metrics_options["source"] = "programmatic"
- token_headers["x-goog-api-client"] = metrics.byoid_metrics_header(
- metrics_options
- )
- if service_account_impersonation_url:
- token_scopes = "https://www.googleapis.com/auth/iam"
- else:
- token_scopes = " ".join(used_scopes or [])
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": audience,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": token_scopes,
- "subject_token": subject_token,
- "subject_token_type": subject_token_type,
- }
- if workforce_pool_user_project:
- token_request_data["options"] = urllib.parse.quote(
- json.dumps({"userProject": workforce_pool_user_project})
- )
- metrics_header_value = (
- "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
- )
- if service_account_impersonation_url:
- # Service account impersonation request/response.
- expire_time = (
- _helpers.utcnow().replace(microsecond=0)
- + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(token_response["access_token"]),
- "x-goog-api-client": metrics_header_value,
- "x-allowed-locations": "0x0",
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": used_scopes,
- "lifetime": "3600s",
- }
- # Initialize mock request to handle token retrieval, token exchange and
- # service account impersonation request.
- requests = []
- if credential_data:
- requests.append((http_client.OK, credential_data))
- token_request_index = len(requests)
- requests.append((http_client.OK, token_response))
- if service_account_impersonation_url:
- impersonation_request_index = len(requests)
- requests.append((http_client.OK, impersonation_response))
- request = cls.make_mock_request(*[el for req in requests for el in req])
- with mock.patch(
- "google.auth.metrics.token_request_access_token_impersonate",
- return_value=metrics_header_value,
- ):
- credentials.refresh(request)
- assert len(request.call_args_list) == len(requests)
- if credential_data:
- cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
- # Verify token exchange request parameters.
- cls.assert_token_request_kwargs(
- request.call_args_list[token_request_index][1],
- token_headers,
- token_request_data,
- token_url,
- )
- # Verify service account impersonation request parameters if the request
- # is processed.
- if service_account_impersonation_url:
- cls.assert_impersonation_request_kwargs(
- request.call_args_list[impersonation_request_index][1],
- impersonation_headers,
- impersonation_request_data,
- service_account_impersonation_url,
- )
- assert credentials.token == impersonation_response["accessToken"]
- else:
- assert credentials.token == token_response["access_token"]
- assert credentials.quota_project_id == quota_project_id
- assert credentials.scopes == scopes
- assert credentials.default_scopes == default_scopes
- @classmethod
- def make_credentials(
- cls,
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=TOKEN_INFO_URL,
- client_id=None,
- client_secret=None,
- quota_project_id=None,
- scopes=None,
- default_scopes=None,
- service_account_impersonation_url=None,
- credential_source=None,
- subject_token_supplier=None,
- workforce_pool_user_project=None,
- ):
- return identity_pool.Credentials(
- audience=audience,
- subject_token_type=subject_token_type,
- token_url=token_url,
- token_info_url=token_info_url,
- service_account_impersonation_url=service_account_impersonation_url,
- credential_source=credential_source,
- subject_token_supplier=subject_token_supplier,
- client_id=client_id,
- client_secret=client_secret,
- quota_project_id=quota_project_id,
- scopes=scopes,
- default_scopes=default_scopes,
- workforce_pool_user_project=workforce_pool_user_project,
- )
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_info_full_options(self, mock_init):
- credentials = identity_pool.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "service_account_impersonation": {"token_lifetime_seconds": 2800},
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- "quota_project_id": QUOTA_PROJECT_ID,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- }
- )
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=TOKEN_INFO_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- service_account_impersonation_options={"token_lifetime_seconds": 2800},
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- subject_token_supplier=None,
- quota_project_id=QUOTA_PROJECT_ID,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_info_required_options_only(self, mock_init):
- credentials = identity_pool.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- }
- )
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=None,
- service_account_impersonation_url=None,
- service_account_impersonation_options={},
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- subject_token_supplier=None,
- quota_project_id=None,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_info_supplier(self, mock_init):
- supplier = TestSubjectTokenSupplier()
- credentials = identity_pool.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "subject_token_supplier": supplier,
- }
- )
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=None,
- service_account_impersonation_url=None,
- service_account_impersonation_options={},
- client_id=None,
- client_secret=None,
- credential_source=None,
- subject_token_supplier=supplier,
- quota_project_id=None,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_info_workforce_pool(self, mock_init):
- credentials = identity_pool.Credentials.from_info(
- {
- "audience": WORKFORCE_AUDIENCE,
- "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
- }
- )
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=WORKFORCE_AUDIENCE,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=None,
- service_account_impersonation_url=None,
- service_account_impersonation_options={},
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- subject_token_supplier=None,
- quota_project_id=None,
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_file_full_options(self, mock_init, tmpdir):
- info = {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "service_account_impersonation": {"token_lifetime_seconds": 2800},
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- "quota_project_id": QUOTA_PROJECT_ID,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = identity_pool.Credentials.from_file(str(config_file))
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=TOKEN_INFO_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- service_account_impersonation_options={"token_lifetime_seconds": 2800},
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- subject_token_supplier=None,
- quota_project_id=QUOTA_PROJECT_ID,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_file_required_options_only(self, mock_init, tmpdir):
- info = {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = identity_pool.Credentials.from_file(str(config_file))
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=None,
- service_account_impersonation_url=None,
- service_account_impersonation_options={},
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- subject_token_supplier=None,
- quota_project_id=None,
- workforce_pool_user_project=None,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_file_workforce_pool(self, mock_init, tmpdir):
- info = {
- "audience": WORKFORCE_AUDIENCE,
- "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = identity_pool.Credentials.from_file(str(config_file))
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=WORKFORCE_AUDIENCE,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- token_info_url=None,
- service_account_impersonation_url=None,
- service_account_impersonation_options={},
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- subject_token_supplier=None,
- quota_project_id=None,
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- universe_domain=DEFAULT_UNIVERSE_DOMAIN,
- )
- def test_constructor_nonworkforce_with_workforce_pool_user_project(self):
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(
- audience=AUDIENCE,
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- )
- assert excinfo.match(
- "workforce_pool_user_project should not be set for non-workforce "
- "pool credentials"
- )
- def test_constructor_invalid_options(self):
- credential_source = {"unsupported": "value"}
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"Missing credential_source")
- def test_constructor_invalid_options_url_and_file(self):
- credential_source = {
- "url": self.CREDENTIAL_URL,
- "file": SUBJECT_TOKEN_TEXT_FILE,
- }
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"Ambiguous credential_source")
- def test_constructor_invalid_options_url_and_certificate(self):
- credential_source = {
- "url": self.CREDENTIAL_URL,
- "certificate": {"certificate": {"use_default_certificate_config": True}},
- }
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"Ambiguous credential_source")
- def test_constructor_invalid_options_file_and_certificate(self):
- credential_source = {
- "file": SUBJECT_TOKEN_TEXT_FILE,
- "certificate": {"certificate": {"use_default_certificate": True}},
- }
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"Ambiguous credential_source")
- def test_constructor_invalid_options_url_file_and_certificate(self):
- credential_source = {
- "file": SUBJECT_TOKEN_TEXT_FILE,
- "url": self.CREDENTIAL_URL,
- "certificate": {"certificate": {"use_default_certificate": True}},
- }
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"Ambiguous credential_source")
- def test_constructor_invalid_options_environment_id(self):
- credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(
- r"Invalid Identity Pool credential_source field 'environment_id'"
- )
- def test_constructor_invalid_credential_source(self):
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source="non-dict")
- assert excinfo.match(
- r"Invalid credential_source. The credential_source is not a dict."
- )
- def test_constructor_invalid_no_credential_source_or_supplier(self):
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials()
- assert excinfo.match(
- r"A valid credential source or a subject token supplier must be provided."
- )
- def test_constructor_invalid_both_credential_source_and_supplier(self):
- supplier = TestSubjectTokenSupplier()
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- subject_token_supplier=supplier,
- )
- assert excinfo.match(
- r"Identity pool credential cannot have both a credential source and a subject token supplier."
- )
- def test_constructor_invalid_credential_source_format_type(self):
- credential_source = {"file": "test.txt", "format": {"type": "xml"}}
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"Invalid credential_source format 'xml'")
- def test_constructor_missing_subject_token_field_name(self):
- credential_source = {"file": "test.txt", "format": {"type": "json"}}
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(
- r"Missing subject_token_field_name for JSON credential_source format"
- )
- def test_constructor_default_and_file_location_certificate(self):
- credential_source = {
- "certificate": {
- "use_default_certificate_config": True,
- "certificate_config_location": "test",
- }
- }
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"Invalid certificate configuration")
- def test_constructor_no_default_or_file_location_certificate(self):
- credential_source = {"certificate": {"use_default_certificate_config": False}}
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
- assert excinfo.match(r"Invalid certificate configuration")
- def test_info_with_workforce_pool_user_project(self):
- credentials = self.make_credentials(
- audience=WORKFORCE_AUDIENCE,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- )
- assert credentials.info == {
- "type": "external_account",
- "audience": WORKFORCE_AUDIENCE,
- "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
- "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
- "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
- }
- def test_info_with_file_credential_source(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
- )
- assert credentials.info == {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
- "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
- }
- def test_info_with_url_credential_source(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
- )
- assert credentials.info == {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
- "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
- }
- def test_info_with_certificate_credential_source(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE.copy()
- )
- assert credentials.info == {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "credential_source": self.CREDENTIAL_SOURCE_CERTIFICATE,
- "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
- }
- def test_info_with_non_default_certificate_credential_source(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT.copy()
- )
- assert credentials.info == {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "token_info_url": TOKEN_INFO_URL,
- "credential_source": self.CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT,
- "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
- }
- def test_info_with_default_token_url(self):
- credentials = identity_pool.Credentials(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
- )
- assert credentials.info == {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
- "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
- }
- def test_info_with_default_token_url_with_universe_domain(self):
- credentials = identity_pool.Credentials(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
- universe_domain="testdomain.org",
- )
- assert credentials.info == {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": "https://sts.testdomain.org/v1/token",
- "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
- "universe_domain": "testdomain.org",
- }
- def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
- # Provide empty text file.
- empty_file = tmpdir.join("empty.txt")
- empty_file.write("")
- credential_source = {"file": str(empty_file)}
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
- assert excinfo.match(r"Missing subject_token in the credential_source file")
- def test_retrieve_subject_token_text_file(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT
- )
- subject_token = credentials.retrieve_subject_token(None)
- assert subject_token == TEXT_FILE_SUBJECT_TOKEN
- def test_retrieve_subject_token_json_file(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON
- )
- subject_token = credentials.retrieve_subject_token(None)
- assert subject_token == JSON_FILE_SUBJECT_TOKEN
- def test_retrieve_subject_token_certificate(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE
- )
- subject_token = credentials.retrieve_subject_token(None)
- assert subject_token == ""
- def test_retrieve_subject_token_json_file_invalid_field_name(self):
- credential_source = {
- "file": SUBJECT_TOKEN_JSON_FILE,
- "format": {"type": "json", "subject_token_field_name": "not_found"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- SUBJECT_TOKEN_JSON_FILE, "not_found"
- )
- )
- def test_retrieve_subject_token_invalid_json(self, tmpdir):
- # Provide JSON file. This should result in JSON parsing error.
- invalid_json_file = tmpdir.join("invalid.json")
- invalid_json_file.write("{")
- credential_source = {
- "file": str(invalid_json_file),
- "format": {"type": "json", "subject_token_field_name": "access_token"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- str(invalid_json_file), "access_token"
- )
- )
- def test_retrieve_subject_token_file_not_found(self):
- credential_source = {"file": "./not_found.txt"}
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
- assert excinfo.match(r"File './not_found.txt' was not found")
- def test_token_info_url(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON
- )
- assert credentials.token_info_url == TOKEN_INFO_URL
- def test_token_info_url_custom(self):
- for url in VALID_TOKEN_URLS:
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
- token_info_url=(url + "/introspect"),
- )
- assert credentials.token_info_url == url + "/introspect"
- def test_token_info_url_negative(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), token_info_url=None
- )
- assert not credentials.token_info_url
- def test_token_url_custom(self):
- for url in VALID_TOKEN_URLS:
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
- token_url=(url + "/token"),
- )
- assert credentials._token_url == (url + "/token")
- def test_service_account_impersonation_url_custom(self):
- for url in VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS:
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
- service_account_impersonation_url=(
- url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
- ),
- )
- assert credentials._service_account_impersonation_url == (
- url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
- )
- def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
- self,
- ):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- scopes=SCOPES,
- # Default scopes should be ignored.
- default_scopes=["ignored"],
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=["ignored"],
- )
- def test_refresh_workforce_success_with_client_auth_without_impersonation(self):
- credentials = self.make_credentials(
- audience=WORKFORCE_AUDIENCE,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- scopes=SCOPES,
- # This will be ignored in favor of client auth.
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=WORKFORCE_AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- workforce_pool_user_project=None,
- )
- def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self):
- credentials = self.make_credentials(
- audience=WORKFORCE_AUDIENCE,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- scopes=SCOPES,
- # This is not needed when client Auth is used.
- workforce_pool_user_project=None,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=WORKFORCE_AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- workforce_pool_user_project=None,
- )
- def test_refresh_workforce_success_without_client_auth_without_impersonation(self):
- credentials = self.make_credentials(
- audience=WORKFORCE_AUDIENCE,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- client_id=None,
- client_secret=None,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- scopes=SCOPES,
- # This will not be ignored as client auth is not used.
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=WORKFORCE_AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- )
- def test_refresh_workforce_success_without_client_auth_with_impersonation(self):
- credentials = self.make_credentials(
- audience=WORKFORCE_AUDIENCE,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- client_id=None,
- client_secret=None,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- scopes=SCOPES,
- # This will not be ignored as client auth is not used.
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=WORKFORCE_AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
- )
- def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- scopes=None,
- # Default scopes should be used since user specified scopes are none.
- default_scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=None,
- default_scopes=SCOPES,
- )
- def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- # Default scopes should be ignored.
- default_scopes=["ignored"],
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=["ignored"],
- )
- def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
- # Initialize credentials with service account impersonation, basic auth
- # and default scopes (no user scopes).
- credentials = self.make_credentials(
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=None,
- # Default scopes should be used since user specified scopes are none.
- default_scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=None,
- default_scopes=SCOPES,
- )
- def test_refresh_json_file_success_without_impersonation(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with JSON format type.
- credential_source=self.CREDENTIAL_SOURCE_JSON,
- scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- )
- def test_refresh_json_file_success_with_impersonation(self):
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- # Test with JSON format type.
- credential_source=self.CREDENTIAL_SOURCE_JSON,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- )
- def test_refresh_with_retrieve_subject_token_error(self):
- credential_source = {
- "file": SUBJECT_TOKEN_JSON_FILE,
- "format": {"type": "json", "subject_token_field_name": "not_found"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(None)
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- SUBJECT_TOKEN_JSON_FILE, "not_found"
- )
- )
- def test_retrieve_subject_token_from_url(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
- )
- request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == TEXT_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
- def test_retrieve_subject_token_from_url_with_headers(self):
- credentials = self.make_credentials(
- credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
- )
- request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == TEXT_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(
- request.call_args_list[0][1], {"foo": "bar"}
- )
- def test_retrieve_subject_token_from_url_json(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL
- )
- request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == JSON_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
- def test_retrieve_subject_token_from_url_json_with_headers(self):
- credentials = self.make_credentials(
- credential_source={
- "url": self.CREDENTIAL_URL,
- "format": {"type": "json", "subject_token_field_name": "access_token"},
- "headers": {"foo": "bar"},
- }
- )
- request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
- subject_token = credentials.retrieve_subject_token(request)
- assert subject_token == JSON_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(
- request.call_args_list[0][1], {"foo": "bar"}
- )
- def test_retrieve_subject_token_from_url_not_found(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(
- self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
- )
- assert excinfo.match("Unable to retrieve Identity Pool subject token")
- def test_retrieve_subject_token_from_url_json_invalid_field(self):
- credential_source = {
- "url": self.CREDENTIAL_URL,
- "format": {"type": "json", "subject_token_field_name": "not_found"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(
- self.make_mock_request(token_data=JSON_FILE_CONTENT)
- )
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- self.CREDENTIAL_URL, "not_found"
- )
- )
- def test_retrieve_subject_token_from_url_json_invalid_format(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- self.CREDENTIAL_URL, "access_token"
- )
- )
- def test_refresh_text_file_success_without_impersonation_url(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
- scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- credential_data=TEXT_FILE_SUBJECT_TOKEN,
- )
- def test_refresh_text_file_success_with_impersonation_url(self):
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- credential_data=TEXT_FILE_SUBJECT_TOKEN,
- )
- def test_refresh_json_file_success_without_impersonation_url(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with JSON format type.
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
- scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- credential_data=JSON_FILE_CONTENT,
- )
- def test_refresh_json_file_success_with_impersonation_url(self):
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- # Test with JSON format type.
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- credential_data=JSON_FILE_CONTENT,
- )
- def test_refresh_with_retrieve_subject_token_error_url(self):
- credential_source = {
- "url": self.CREDENTIAL_URL,
- "format": {"type": "json", "subject_token_field_name": "not_found"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- self.CREDENTIAL_URL, "not_found"
- )
- )
- def test_retrieve_subject_token_supplier(self):
- supplier = TestSubjectTokenSupplier(subject_token=JSON_FILE_SUBJECT_TOKEN)
- credentials = self.make_credentials(subject_token_supplier=supplier)
- subject_token = credentials.retrieve_subject_token(None)
- assert subject_token == JSON_FILE_SUBJECT_TOKEN
- def test_retrieve_subject_token_supplier_correct_context(self):
- supplier = TestSubjectTokenSupplier(
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- expected_context=external_account.SupplierContext(
- SUBJECT_TOKEN_TYPE, AUDIENCE
- ),
- )
- credentials = self.make_credentials(subject_token_supplier=supplier)
- credentials.retrieve_subject_token(None)
- def test_retrieve_subject_token_supplier_error(self):
- expected_exception = exceptions.RefreshError("test error")
- supplier = TestSubjectTokenSupplier(subject_token_exception=expected_exception)
- credentials = self.make_credentials(subject_token_supplier=supplier)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
- assert excinfo.match("test error")
- def test_refresh_success_supplier_with_impersonation_url(self):
- # Initialize credentials with service account impersonation and a supplier.
- supplier = TestSubjectTokenSupplier(subject_token=JSON_FILE_SUBJECT_TOKEN)
- credentials = self.make_credentials(
- subject_token_supplier=supplier,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- )
- def test_refresh_success_supplier_without_impersonation_url(self):
- # Initialize supplier credentials without service account impersonation.
- supplier = TestSubjectTokenSupplier(subject_token=JSON_FILE_SUBJECT_TOKEN)
- credentials = self.make_credentials(
- subject_token_supplier=supplier, scopes=SCOPES
- )
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- )
- @mock.patch(
- "google.auth.transport._mtls_helper._get_workload_cert_and_key_paths",
- return_value=("cert", "key"),
- )
- def test_get_mtls_certs(self, mock_get_workload_cert_and_key_paths):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE.copy()
- )
- cert, key = credentials._get_mtls_cert_and_key_paths()
- assert cert == "cert"
- assert key == "key"
- def test_get_mtls_certs_invalid(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT.copy()
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials._get_mtls_cert_and_key_paths()
- assert excinfo.match(
- 'The credential is not configured to use mtls requests. The credential should include a "certificate" section in the credential source.'
- )
|