123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365 |
- # Copyright 2016 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import json
- import os
- import mock
- import pytest # type: ignore
- from google.auth import _default
- from google.auth import api_key
- from google.auth import app_engine
- from google.auth import aws
- from google.auth import compute_engine
- from google.auth import credentials
- from google.auth import environment_vars
- from google.auth import exceptions
- from google.auth import external_account
- from google.auth import external_account_authorized_user
- from google.auth import identity_pool
- from google.auth import impersonated_credentials
- from google.auth import pluggable
- from google.oauth2 import gdch_credentials
- from google.oauth2 import service_account
- import google.oauth2.credentials
- import yatest.common as yc
- DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
- AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, "authorized_user.json")
- with open(AUTHORIZED_USER_FILE) as fh:
- AUTHORIZED_USER_FILE_DATA = json.load(fh)
- AUTHORIZED_USER_CLOUD_SDK_FILE = os.path.join(
- DATA_DIR, "authorized_user_cloud_sdk.json"
- )
- AUTHORIZED_USER_CLOUD_SDK_WITH_QUOTA_PROJECT_ID_FILE = os.path.join(
- DATA_DIR, "authorized_user_cloud_sdk_with_quota_project_id.json"
- )
- SERVICE_ACCOUNT_FILE = os.path.join(DATA_DIR, "service_account.json")
- CLIENT_SECRETS_FILE = os.path.join(DATA_DIR, "client_secrets.json")
- GDCH_SERVICE_ACCOUNT_FILE = os.path.join(DATA_DIR, "gdch_service_account.json")
- with open(SERVICE_ACCOUNT_FILE) as fh:
- SERVICE_ACCOUNT_FILE_DATA = json.load(fh)
- SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
- TOKEN_URL = "https://sts.googleapis.com/v1/token"
- AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
- WORKFORCE_AUDIENCE = (
- "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
- )
- WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
- REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
- SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
- CRED_VERIFICATION_URL = (
- "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
- )
- IDENTITY_POOL_DATA = {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
- "token_url": TOKEN_URL,
- "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
- }
- PLUGGABLE_DATA = {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
- "token_url": TOKEN_URL,
- "credential_source": {"executable": {"command": "command"}},
- }
- AWS_DATA = {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
- "token_url": TOKEN_URL,
- "credential_source": {
- "environment_id": "aws1",
- "region_url": REGION_URL,
- "url": SECURITY_CREDS_URL,
- "regional_cred_verification_url": CRED_VERIFICATION_URL,
- },
- }
- SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
- SERVICE_ACCOUNT_IMPERSONATION_URL = (
- "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
- + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
- )
- IMPERSONATED_IDENTITY_POOL_DATA = {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
- "token_url": TOKEN_URL,
- "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- }
- IMPERSONATED_AWS_DATA = {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
- "token_url": TOKEN_URL,
- "credential_source": {
- "environment_id": "aws1",
- "region_url": REGION_URL,
- "url": SECURITY_CREDS_URL,
- "regional_cred_verification_url": CRED_VERIFICATION_URL,
- },
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- }
- IDENTITY_POOL_WORKFORCE_DATA = {
- "type": "external_account",
- "audience": WORKFORCE_AUDIENCE,
- "subject_token_type": "urn:ietf:params:oauth:token-type:id_token",
- "token_url": TOKEN_URL,
- "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
- "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
- }
- IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA = {
- "type": "external_account",
- "audience": WORKFORCE_AUDIENCE,
- "subject_token_type": "urn:ietf:params:oauth:token-type:id_token",
- "token_url": TOKEN_URL,
- "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
- }
- IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE = os.path.join(
- DATA_DIR, "impersonated_service_account_authorized_user_source.json"
- )
- IMPERSONATED_SERVICE_ACCOUNT_WITH_QUOTA_PROJECT_FILE = os.path.join(
- DATA_DIR, "impersonated_service_account_with_quota_project.json"
- )
- IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE = os.path.join(
- DATA_DIR, "impersonated_service_account_service_account_source.json"
- )
- EXTERNAL_ACCOUNT_AUTHORIZED_USER_FILE = os.path.join(
- DATA_DIR, "external_account_authorized_user.json"
- )
- EXTERNAL_ACCOUNT_AUTHORIZED_USER_NON_GDU_FILE = os.path.join(
- DATA_DIR, "external_account_authorized_user_non_gdu.json"
- )
- MOCK_CREDENTIALS = mock.Mock(spec=credentials.CredentialsWithQuotaProject)
- MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS
- def get_project_id_side_effect(self, request=None):
- # If no scopes are set, this will always return None.
- if not self.scopes:
- return None
- return mock.sentinel.project_id
- LOAD_FILE_PATCH = mock.patch(
- "google.auth._default.load_credentials_from_file",
- return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
- autospec=True,
- )
- EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH = mock.patch.object(
- external_account.Credentials,
- "get_project_id",
- side_effect=get_project_id_side_effect,
- autospec=True,
- )
- def test_load_credentials_from_missing_file():
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file("")
- assert excinfo.match(r"not found")
- def test_load_credentials_from_dict_non_dict_object():
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_dict("")
- assert excinfo.match(r"dict type was expected")
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_dict(None)
- assert excinfo.match(r"dict type was expected")
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_dict(1)
- assert excinfo.match(r"dict type was expected")
- def test_load_credentials_from_dict_authorized_user():
- credentials, project_id = _default.load_credentials_from_dict(
- AUTHORIZED_USER_FILE_DATA
- )
- assert isinstance(credentials, google.oauth2.credentials.Credentials)
- assert project_id is None
- def test_load_credentials_from_file_invalid_json(tmpdir):
- jsonfile = tmpdir.join("invalid.json")
- jsonfile.write("{")
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(jsonfile))
- assert excinfo.match(r"not a valid json file")
- def test_load_credentials_from_file_invalid_type(tmpdir):
- jsonfile = tmpdir.join("invalid.json")
- jsonfile.write(json.dumps({"type": "not-a-real-type"}))
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(jsonfile))
- assert excinfo.match(r"does not have a valid type")
- def test_load_credentials_from_file_authorized_user():
- credentials, project_id = _default.load_credentials_from_file(AUTHORIZED_USER_FILE)
- assert isinstance(credentials, google.oauth2.credentials.Credentials)
- assert project_id is None
- def test_load_credentials_from_file_no_type(tmpdir):
- # use the client_secrets.json, which is valid json but not a
- # loadable credentials type
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(CLIENT_SECRETS_FILE)
- assert excinfo.match(r"does not have a valid type")
- assert excinfo.match(r"Type is None")
- def test_load_credentials_from_file_authorized_user_bad_format(tmpdir):
- filename = tmpdir.join("authorized_user_bad.json")
- filename.write(json.dumps({"type": "authorized_user"}))
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(filename))
- assert excinfo.match(r"Failed to load authorized user")
- assert excinfo.match(r"missing fields")
- def test_load_credentials_from_file_authorized_user_cloud_sdk():
- with pytest.warns(UserWarning, match="Cloud SDK"):
- credentials, project_id = _default.load_credentials_from_file(
- AUTHORIZED_USER_CLOUD_SDK_FILE
- )
- assert isinstance(credentials, google.oauth2.credentials.Credentials)
- assert project_id is None
- # No warning if the json file has quota project id.
- credentials, project_id = _default.load_credentials_from_file(
- AUTHORIZED_USER_CLOUD_SDK_WITH_QUOTA_PROJECT_ID_FILE
- )
- assert isinstance(credentials, google.oauth2.credentials.Credentials)
- assert project_id is None
- def test_load_credentials_from_file_authorized_user_cloud_sdk_with_scopes():
- with pytest.warns(UserWarning, match="Cloud SDK"):
- credentials, project_id = _default.load_credentials_from_file(
- AUTHORIZED_USER_CLOUD_SDK_FILE,
- scopes=["https://www.google.com/calendar/feeds"],
- )
- assert isinstance(credentials, google.oauth2.credentials.Credentials)
- assert project_id is None
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- def test_load_credentials_from_file_authorized_user_cloud_sdk_with_quota_project():
- credentials, project_id = _default.load_credentials_from_file(
- AUTHORIZED_USER_CLOUD_SDK_FILE, quota_project_id="project-foo"
- )
- assert isinstance(credentials, google.oauth2.credentials.Credentials)
- assert project_id is None
- assert credentials.quota_project_id == "project-foo"
- def test_load_credentials_from_file_service_account():
- credentials, project_id = _default.load_credentials_from_file(SERVICE_ACCOUNT_FILE)
- assert isinstance(credentials, service_account.Credentials)
- assert project_id == SERVICE_ACCOUNT_FILE_DATA["project_id"]
- def test_load_credentials_from_file_service_account_with_scopes():
- credentials, project_id = _default.load_credentials_from_file(
- SERVICE_ACCOUNT_FILE, scopes=["https://www.google.com/calendar/feeds"]
- )
- assert isinstance(credentials, service_account.Credentials)
- assert project_id == SERVICE_ACCOUNT_FILE_DATA["project_id"]
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- def test_load_credentials_from_file_service_account_with_quota_project():
- credentials, project_id = _default.load_credentials_from_file(
- SERVICE_ACCOUNT_FILE, quota_project_id="project-foo"
- )
- assert isinstance(credentials, service_account.Credentials)
- assert project_id == SERVICE_ACCOUNT_FILE_DATA["project_id"]
- assert credentials.quota_project_id == "project-foo"
- def test_load_credentials_from_file_service_account_bad_format(tmpdir):
- filename = tmpdir.join("serivce_account_bad.json")
- filename.write(json.dumps({"type": "service_account"}))
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(filename))
- assert excinfo.match(r"Failed to load service account")
- assert excinfo.match(r"missing fields")
- def test_load_credentials_from_file_impersonated_with_authorized_user_source():
- credentials, project_id = _default.load_credentials_from_file(
- IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE
- )
- assert isinstance(credentials, impersonated_credentials.Credentials)
- assert isinstance(
- credentials._source_credentials, google.oauth2.credentials.Credentials
- )
- assert credentials.service_account_email == "service-account-target@example.com"
- assert credentials._delegates == ["service-account-delegate@example.com"]
- assert not credentials._quota_project_id
- assert not credentials._target_scopes
- assert project_id is None
- def test_load_credentials_from_file_impersonated_with_quota_project():
- credentials, _ = _default.load_credentials_from_file(
- IMPERSONATED_SERVICE_ACCOUNT_WITH_QUOTA_PROJECT_FILE
- )
- assert isinstance(credentials, impersonated_credentials.Credentials)
- assert credentials._quota_project_id == "quota_project"
- def test_load_credentials_from_file_impersonated_with_service_account_source():
- credentials, _ = _default.load_credentials_from_file(
- IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE
- )
- assert isinstance(credentials, impersonated_credentials.Credentials)
- assert isinstance(credentials._source_credentials, service_account.Credentials)
- assert not credentials._quota_project_id
- def test_load_credentials_from_file_impersonated_passing_quota_project():
- credentials, _ = _default.load_credentials_from_file(
- IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE,
- quota_project_id="new_quota_project",
- )
- assert credentials._quota_project_id == "new_quota_project"
- def test_load_credentials_from_file_impersonated_passing_scopes():
- credentials, _ = _default.load_credentials_from_file(
- IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE,
- scopes=["scope1", "scope2"],
- )
- assert credentials._target_scopes == ["scope1", "scope2"]
- def test_load_credentials_from_file_impersonated_wrong_target_principal(tmpdir):
- with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE) as fh:
- impersonated_credentials_info = json.load(fh)
- impersonated_credentials_info[
- "service_account_impersonation_url"
- ] = "something_wrong"
- jsonfile = tmpdir.join("invalid.json")
- jsonfile.write(json.dumps(impersonated_credentials_info))
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(jsonfile))
- assert excinfo.match(r"Cannot extract target principal")
- def test_load_credentials_from_file_impersonated_wrong_source_type(tmpdir):
- with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE) as fh:
- impersonated_credentials_info = json.load(fh)
- impersonated_credentials_info["source_credentials"]["type"] = "external_account"
- jsonfile = tmpdir.join("invalid.json")
- jsonfile.write(json.dumps(impersonated_credentials_info))
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(jsonfile))
- assert excinfo.match(r"source credential of type external_account is not supported")
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_identity_pool(
- get_project_id, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
- assert isinstance(credentials, identity_pool.Credentials)
- # Since no scopes are specified, the project ID cannot be determined.
- assert project_id is None
- assert get_project_id.called
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_aws(get_project_id, tmpdir):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(AWS_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
- assert isinstance(credentials, aws.Credentials)
- # Since no scopes are specified, the project ID cannot be determined.
- assert project_id is None
- assert get_project_id.called
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_identity_pool_impersonated(
- get_project_id, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
- assert isinstance(credentials, identity_pool.Credentials)
- assert not credentials.is_user
- assert not credentials.is_workforce_pool
- # Since no scopes are specified, the project ID cannot be determined.
- assert project_id is None
- assert get_project_id.called
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_aws_impersonated(
- get_project_id, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IMPERSONATED_AWS_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
- assert isinstance(credentials, aws.Credentials)
- assert not credentials.is_user
- assert not credentials.is_workforce_pool
- # Since no scopes are specified, the project ID cannot be determined.
- assert project_id is None
- assert get_project_id.called
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_workforce(get_project_id, tmpdir):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_WORKFORCE_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
- assert isinstance(credentials, identity_pool.Credentials)
- assert credentials.is_user
- assert credentials.is_workforce_pool
- # Since no scopes are specified, the project ID cannot be determined.
- assert project_id is None
- assert get_project_id.called
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_workforce_impersonated(
- get_project_id, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
- assert isinstance(credentials, identity_pool.Credentials)
- assert not credentials.is_user
- assert credentials.is_workforce_pool
- # Since no scopes are specified, the project ID cannot be determined.
- assert project_id is None
- assert get_project_id.called
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_with_user_and_default_scopes(
- get_project_id, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(
- str(config_file),
- scopes=["https://www.google.com/calendar/feeds"],
- default_scopes=["https://www.googleapis.com/auth/cloud-platform"],
- )
- assert isinstance(credentials, identity_pool.Credentials)
- # Since scopes are specified, the project ID can be determined.
- assert project_id is mock.sentinel.project_id
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- assert credentials.default_scopes == [
- "https://www.googleapis.com/auth/cloud-platform"
- ]
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_with_quota_project(
- get_project_id, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(
- str(config_file), quota_project_id="project-foo"
- )
- assert isinstance(credentials, identity_pool.Credentials)
- # Since no scopes are specified, the project ID cannot be determined.
- assert project_id is None
- assert credentials.quota_project_id == "project-foo"
- def test_load_credentials_from_file_external_account_bad_format(tmpdir):
- filename = tmpdir.join("external_account_bad.json")
- filename.write(json.dumps({"type": "external_account"}))
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(filename))
- assert excinfo.match(
- "Failed to load external account credentials from {}".format(str(filename))
- )
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_file_external_account_explicit_request(
- get_project_id, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(
- str(config_file),
- request=mock.sentinel.request,
- scopes=["https://www.googleapis.com/auth/cloud-platform"],
- )
- assert isinstance(credentials, identity_pool.Credentials)
- # Since scopes are specified, the project ID can be determined.
- assert project_id is mock.sentinel.project_id
- get_project_id.assert_called_with(credentials, request=mock.sentinel.request)
- @mock.patch.dict(os.environ, {}, clear=True)
- def test__get_explicit_environ_credentials_no_env():
- assert _default._get_explicit_environ_credentials() == (None, None)
- def test_load_credentials_from_file_external_account_authorized_user():
- credentials, project_id = _default.load_credentials_from_file(
- EXTERNAL_ACCOUNT_AUTHORIZED_USER_FILE, request=mock.sentinel.request
- )
- assert isinstance(credentials, external_account_authorized_user.Credentials)
- assert project_id is None
- def test_load_credentials_from_file_external_account_authorized_user_non_gdu():
- credentials, _ = _default.load_credentials_from_file(
- EXTERNAL_ACCOUNT_AUTHORIZED_USER_NON_GDU_FILE, request=mock.sentinel.request
- )
- assert isinstance(credentials, external_account_authorized_user.Credentials)
- assert credentials.universe_domain == "fake_universe_domain"
- def test_load_credentials_from_file_external_account_authorized_user_bad_format(tmpdir):
- filename = tmpdir.join("external_account_authorized_user_bad.json")
- filename.write(json.dumps({"type": "external_account_authorized_user"}))
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(filename))
- assert excinfo.match(
- "Failed to load external account authorized user credentials from {}".format(
- str(filename)
- )
- )
- @pytest.mark.parametrize("quota_project_id", [None, "project-foo"])
- @LOAD_FILE_PATCH
- def test__get_explicit_environ_credentials(load, quota_project_id, monkeypatch):
- monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
- credentials, project_id = _default._get_explicit_environ_credentials(
- quota_project_id=quota_project_id
- )
- assert credentials is MOCK_CREDENTIALS
- assert project_id is mock.sentinel.project_id
- load.assert_called_with("filename", quota_project_id=quota_project_id)
- @LOAD_FILE_PATCH
- def test__get_explicit_environ_credentials_no_project_id(load, monkeypatch):
- load.return_value = MOCK_CREDENTIALS, None
- monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
- credentials, project_id = _default._get_explicit_environ_credentials()
- assert credentials is MOCK_CREDENTIALS
- assert project_id is None
- @pytest.mark.parametrize("quota_project_id", [None, "project-foo"])
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- @mock.patch("google.auth._default._get_gcloud_sdk_credentials", autospec=True)
- def test__get_explicit_environ_credentials_fallback_to_gcloud(
- get_gcloud_creds, get_adc_path, quota_project_id, monkeypatch
- ):
- # Set explicit credentials path to cloud sdk credentials path.
- get_adc_path.return_value = "filename"
- monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
- _default._get_explicit_environ_credentials(quota_project_id=quota_project_id)
- # Check we fall back to cloud sdk flow since explicit credentials path is
- # cloud sdk credentials path
- get_gcloud_creds.assert_called_with(quota_project_id=quota_project_id)
- @pytest.mark.parametrize("quota_project_id", [None, "project-foo"])
- @LOAD_FILE_PATCH
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test__get_gcloud_sdk_credentials(get_adc_path, load, quota_project_id):
- get_adc_path.return_value = SERVICE_ACCOUNT_FILE
- credentials, project_id = _default._get_gcloud_sdk_credentials(
- quota_project_id=quota_project_id
- )
- assert credentials is MOCK_CREDENTIALS
- assert project_id is mock.sentinel.project_id
- load.assert_called_with(SERVICE_ACCOUNT_FILE, quota_project_id=quota_project_id)
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test__get_gcloud_sdk_credentials_non_existent(get_adc_path, tmpdir):
- non_existent = tmpdir.join("non-existent")
- get_adc_path.return_value = str(non_existent)
- credentials, project_id = _default._get_gcloud_sdk_credentials()
- assert credentials is None
- assert project_id is None
- @mock.patch(
- "google.auth._cloud_sdk.get_project_id",
- return_value=mock.sentinel.project_id,
- autospec=True,
- )
- @mock.patch("os.path.isfile", return_value=True, autospec=True)
- @LOAD_FILE_PATCH
- def test__get_gcloud_sdk_credentials_project_id(load, unused_isfile, get_project_id):
- # Don't return a project ID from load file, make the function check
- # the Cloud SDK project.
- load.return_value = MOCK_CREDENTIALS, None
- credentials, project_id = _default._get_gcloud_sdk_credentials()
- assert credentials == MOCK_CREDENTIALS
- assert project_id == mock.sentinel.project_id
- assert get_project_id.called
- @mock.patch("google.auth._cloud_sdk.get_project_id", return_value=None, autospec=True)
- @mock.patch("os.path.isfile", return_value=True)
- @LOAD_FILE_PATCH
- def test__get_gcloud_sdk_credentials_no_project_id(load, unused_isfile, get_project_id):
- # Don't return a project ID from load file, make the function check
- # the Cloud SDK project.
- load.return_value = MOCK_CREDENTIALS, None
- credentials, project_id = _default._get_gcloud_sdk_credentials()
- assert credentials == MOCK_CREDENTIALS
- assert project_id is None
- assert get_project_id.called
- def test__get_gdch_service_account_credentials_invalid_format_version():
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default._get_gdch_service_account_credentials(
- "file_name", {"format_version": "2"}
- )
- assert excinfo.match("Failed to load GDCH service account credentials")
- def test_get_api_key_credentials():
- creds = _default.get_api_key_credentials("api_key")
- assert isinstance(creds, api_key.Credentials)
- assert creds.token == "api_key"
- class _AppIdentityModule(object):
- """The interface of the App Idenity app engine module.
- See https://cloud.google.com/appengine/docs/standard/python/refdocs\
- /google.appengine.api.app_identity.app_identity
- """
- def get_application_id(self):
- raise NotImplementedError()
- @pytest.fixture
- def app_identity(monkeypatch):
- """Mocks the app_identity module for google.auth.app_engine."""
- app_identity_module = mock.create_autospec(_AppIdentityModule, instance=True)
- monkeypatch.setattr(app_engine, "app_identity", app_identity_module)
- yield app_identity_module
- @mock.patch.dict(os.environ)
- def test__get_gae_credentials_gen1(app_identity):
- os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27"
- app_identity.get_application_id.return_value = mock.sentinel.project
- credentials, project_id = _default._get_gae_credentials()
- assert isinstance(credentials, app_engine.Credentials)
- assert project_id == mock.sentinel.project
- @mock.patch.dict(os.environ)
- def test__get_gae_credentials_gen2():
- os.environ["GAE_RUNTIME"] = "python37"
- credentials, project_id = _default._get_gae_credentials()
- assert credentials is None
- assert project_id is None
- @mock.patch.dict(os.environ)
- def test__get_gae_credentials_gen2_backwards_compat():
- # compat helpers may copy GAE_RUNTIME to APPENGINE_RUNTIME
- # for backwards compatibility with code that relies on it
- os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python37"
- os.environ["GAE_RUNTIME"] = "python37"
- credentials, project_id = _default._get_gae_credentials()
- assert credentials is None
- assert project_id is None
- def test__get_gae_credentials_env_unset():
- assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ
- assert "GAE_RUNTIME" not in os.environ
- credentials, project_id = _default._get_gae_credentials()
- assert credentials is None
- assert project_id is None
- @mock.patch.dict(os.environ)
- def test__get_gae_credentials_no_app_engine():
- # test both with and without LEGACY_APPENGINE_RUNTIME setting
- assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ
- import sys
- with mock.patch.dict(sys.modules, {"google.auth.app_engine": None}):
- credentials, project_id = _default._get_gae_credentials()
- assert credentials is None
- assert project_id is None
- os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27"
- credentials, project_id = _default._get_gae_credentials()
- assert credentials is None
- assert project_id is None
- @mock.patch.dict(os.environ)
- @mock.patch.object(app_engine, "app_identity", new=None)
- def test__get_gae_credentials_no_apis():
- # test both with and without LEGACY_APPENGINE_RUNTIME setting
- assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ
- credentials, project_id = _default._get_gae_credentials()
- assert credentials is None
- assert project_id is None
- os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27"
- credentials, project_id = _default._get_gae_credentials()
- assert credentials is None
- assert project_id is None
- @mock.patch(
- "google.auth.compute_engine._metadata.is_on_gce", return_value=True, autospec=True
- )
- @mock.patch(
- "google.auth.compute_engine._metadata.get_project_id",
- return_value="example-project",
- autospec=True,
- )
- def test__get_gce_credentials(unused_get, unused_ping):
- credentials, project_id = _default._get_gce_credentials()
- assert isinstance(credentials, compute_engine.Credentials)
- assert project_id == "example-project"
- @mock.patch(
- "google.auth.compute_engine._metadata.is_on_gce", return_value=False, autospec=True
- )
- def test__get_gce_credentials_no_ping(unused_ping):
- credentials, project_id = _default._get_gce_credentials()
- assert credentials is None
- assert project_id is None
- @mock.patch(
- "google.auth.compute_engine._metadata.is_on_gce", return_value=True, autospec=True
- )
- @mock.patch(
- "google.auth.compute_engine._metadata.get_project_id",
- side_effect=exceptions.TransportError(),
- autospec=True,
- )
- def test__get_gce_credentials_no_project_id(unused_get, unused_ping):
- credentials, project_id = _default._get_gce_credentials()
- assert isinstance(credentials, compute_engine.Credentials)
- assert project_id is None
- def test__get_gce_credentials_no_compute_engine():
- import sys
- with mock.patch.dict("sys.modules"):
- sys.modules["google.auth.compute_engine"] = None
- credentials, project_id = _default._get_gce_credentials()
- assert credentials is None
- assert project_id is None
- @mock.patch(
- "google.auth.compute_engine._metadata.is_on_gce", return_value=False, autospec=True
- )
- def test__get_gce_credentials_explicit_request(ping):
- _default._get_gce_credentials(mock.sentinel.request)
- ping.assert_called_with(request=mock.sentinel.request)
- @mock.patch(
- "google.auth._default._get_explicit_environ_credentials",
- return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
- autospec=True,
- )
- def test_default_early_out(unused_get):
- assert _default.default() == (MOCK_CREDENTIALS, mock.sentinel.project_id)
- @mock.patch(
- "google.auth._default._get_explicit_environ_credentials",
- return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
- autospec=True,
- )
- def test_default_explict_project_id(unused_get, monkeypatch):
- monkeypatch.setenv(environment_vars.PROJECT, "explicit-env")
- assert _default.default() == (MOCK_CREDENTIALS, "explicit-env")
- @mock.patch(
- "google.auth._default._get_explicit_environ_credentials",
- return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
- autospec=True,
- )
- def test_default_explict_legacy_project_id(unused_get, monkeypatch):
- monkeypatch.setenv(environment_vars.LEGACY_PROJECT, "explicit-env")
- assert _default.default() == (MOCK_CREDENTIALS, "explicit-env")
- @mock.patch("logging.Logger.warning", autospec=True)
- @mock.patch(
- "google.auth._default._get_explicit_environ_credentials",
- return_value=(MOCK_CREDENTIALS, None),
- autospec=True,
- )
- @mock.patch(
- "google.auth._default._get_gcloud_sdk_credentials",
- return_value=(MOCK_CREDENTIALS, None),
- autospec=True,
- )
- @mock.patch(
- "google.auth._default._get_gae_credentials",
- return_value=(MOCK_CREDENTIALS, None),
- autospec=True,
- )
- @mock.patch(
- "google.auth._default._get_gce_credentials",
- return_value=(MOCK_CREDENTIALS, None),
- autospec=True,
- )
- def test_default_without_project_id(
- unused_gce, unused_gae, unused_sdk, unused_explicit, logger_warning
- ):
- assert _default.default() == (MOCK_CREDENTIALS, None)
- logger_warning.assert_called_with(mock.ANY, mock.ANY, mock.ANY)
- @mock.patch(
- "google.auth._default._get_explicit_environ_credentials",
- return_value=(None, None),
- autospec=True,
- )
- @mock.patch(
- "google.auth._default._get_gcloud_sdk_credentials",
- return_value=(None, None),
- autospec=True,
- )
- @mock.patch(
- "google.auth._default._get_gae_credentials",
- return_value=(None, None),
- autospec=True,
- )
- @mock.patch(
- "google.auth._default._get_gce_credentials",
- return_value=(None, None),
- autospec=True,
- )
- def test_default_fail(unused_gce, unused_gae, unused_sdk, unused_explicit):
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- assert _default.default()
- assert excinfo.match(_default._CLOUD_SDK_MISSING_CREDENTIALS)
- @mock.patch(
- "google.auth._default._get_explicit_environ_credentials",
- return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
- autospec=True,
- )
- @mock.patch(
- "google.auth.credentials.with_scopes_if_required",
- return_value=MOCK_CREDENTIALS,
- autospec=True,
- )
- def test_default_scoped(with_scopes, unused_get):
- scopes = ["one", "two"]
- credentials, project_id = _default.default(scopes=scopes)
- assert credentials == with_scopes.return_value
- assert project_id == mock.sentinel.project_id
- with_scopes.assert_called_once_with(MOCK_CREDENTIALS, scopes, default_scopes=None)
- @mock.patch(
- "google.auth._default._get_explicit_environ_credentials",
- return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
- autospec=True,
- )
- def test_default_quota_project(with_quota_project):
- credentials, project_id = _default.default(quota_project_id="project-foo")
- MOCK_CREDENTIALS.with_quota_project.assert_called_once_with("project-foo")
- assert project_id == mock.sentinel.project_id
- @mock.patch(
- "google.auth._default._get_explicit_environ_credentials",
- return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
- autospec=True,
- )
- def test_default_no_app_engine_compute_engine_module(unused_get):
- """
- google.auth.compute_engine and google.auth.app_engine are both optional
- to allow not including them when using this package. This verifies
- that default fails gracefully if these modules are absent
- """
- import sys
- with mock.patch.dict("sys.modules"):
- sys.modules["google.auth.compute_engine"] = None
- sys.modules["google.auth.app_engine"] = None
- assert _default.default() == (MOCK_CREDENTIALS, mock.sentinel.project_id)
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_default_environ_external_credentials_identity_pool(
- get_project_id, monkeypatch, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default()
- assert isinstance(credentials, identity_pool.Credentials)
- assert not credentials.is_user
- assert not credentials.is_workforce_pool
- # Without scopes, project ID cannot be determined.
- assert project_id is None
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_default_environ_external_credentials_identity_pool_impersonated(
- get_project_id, monkeypatch, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default(
- scopes=["https://www.google.com/calendar/feeds"]
- )
- assert isinstance(credentials, identity_pool.Credentials)
- assert not credentials.is_user
- assert not credentials.is_workforce_pool
- assert project_id is mock.sentinel.project_id
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- # The credential.get_project_id should have been used in _get_external_account_credentials and default
- assert get_project_id.call_count == 2
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- @mock.patch.dict(os.environ)
- def test_default_environ_external_credentials_project_from_env(
- get_project_id, monkeypatch, tmpdir
- ):
- project_from_env = "project_from_env"
- os.environ[environment_vars.PROJECT] = project_from_env
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default(
- scopes=["https://www.google.com/calendar/feeds"]
- )
- assert isinstance(credentials, identity_pool.Credentials)
- assert not credentials.is_user
- assert not credentials.is_workforce_pool
- assert project_id == project_from_env
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- # The credential.get_project_id should have been used only in _get_external_account_credentials
- assert get_project_id.call_count == 1
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- @mock.patch.dict(os.environ)
- def test_default_environ_external_credentials_legacy_project_from_env(
- get_project_id, monkeypatch, tmpdir
- ):
- project_from_env = "project_from_env"
- os.environ[environment_vars.LEGACY_PROJECT] = project_from_env
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default(
- scopes=["https://www.google.com/calendar/feeds"]
- )
- assert isinstance(credentials, identity_pool.Credentials)
- assert not credentials.is_user
- assert not credentials.is_workforce_pool
- assert project_id == project_from_env
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- # The credential.get_project_id should have been used only in _get_external_account_credentials
- assert get_project_id.call_count == 1
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_default_environ_external_credentials_aws_impersonated(
- get_project_id, monkeypatch, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IMPERSONATED_AWS_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default(
- scopes=["https://www.google.com/calendar/feeds"]
- )
- assert isinstance(credentials, aws.Credentials)
- assert not credentials.is_user
- assert not credentials.is_workforce_pool
- assert project_id is mock.sentinel.project_id
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_default_environ_external_credentials_workforce(
- get_project_id, monkeypatch, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_WORKFORCE_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default(
- scopes=["https://www.google.com/calendar/feeds"]
- )
- assert isinstance(credentials, identity_pool.Credentials)
- assert credentials.is_user
- assert credentials.is_workforce_pool
- assert project_id is mock.sentinel.project_id
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_default_environ_external_credentials_workforce_impersonated(
- get_project_id, monkeypatch, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default(
- scopes=["https://www.google.com/calendar/feeds"]
- )
- assert isinstance(credentials, identity_pool.Credentials)
- assert not credentials.is_user
- assert credentials.is_workforce_pool
- assert project_id is mock.sentinel.project_id
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_default_environ_external_credentials_with_user_and_default_scopes_and_quota_project_id(
- get_project_id, monkeypatch, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default(
- scopes=["https://www.google.com/calendar/feeds"],
- default_scopes=["https://www.googleapis.com/auth/cloud-platform"],
- quota_project_id="project-foo",
- )
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
- assert credentials.quota_project_id == "project-foo"
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- assert credentials.default_scopes == [
- "https://www.googleapis.com/auth/cloud-platform"
- ]
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_default_environ_external_credentials_explicit_request_with_scopes(
- get_project_id, monkeypatch, tmpdir
- ):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
- credentials, project_id = _default.default(
- request=mock.sentinel.request,
- scopes=["https://www.googleapis.com/auth/cloud-platform"],
- )
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
- # default() will initialize new credentials via with_scopes_if_required
- # and potentially with_quota_project.
- # As a result the caller of get_project_id() will not match the returned
- # credentials.
- get_project_id.assert_called_with(mock.ANY, request=mock.sentinel.request)
- def test_default_environ_external_credentials_bad_format(monkeypatch, tmpdir):
- filename = tmpdir.join("external_account_bad.json")
- filename.write(json.dumps({"type": "external_account"}))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(filename))
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.default()
- assert excinfo.match(
- "Failed to load external account credentials from {}".format(str(filename))
- )
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test_default_warning_without_quota_project_id_for_user_creds(get_adc_path):
- get_adc_path.return_value = AUTHORIZED_USER_CLOUD_SDK_FILE
- with pytest.warns(UserWarning, match=_default._CLOUD_SDK_CREDENTIALS_WARNING):
- credentials, project_id = _default.default(quota_project_id=None)
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test_default_no_warning_with_quota_project_id_for_user_creds(get_adc_path):
- get_adc_path.return_value = AUTHORIZED_USER_CLOUD_SDK_FILE
- credentials, project_id = _default.default(quota_project_id="project-foo")
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test_default_impersonated_service_account(get_adc_path):
- get_adc_path.return_value = IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE
- credentials, _ = _default.default()
- assert isinstance(credentials, impersonated_credentials.Credentials)
- assert isinstance(
- credentials._source_credentials, google.oauth2.credentials.Credentials
- )
- assert credentials.service_account_email == "service-account-target@example.com"
- assert credentials._delegates == ["service-account-delegate@example.com"]
- assert not credentials._quota_project_id
- assert not credentials._target_scopes
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test_default_impersonated_service_account_set_scopes(get_adc_path):
- get_adc_path.return_value = IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE
- scopes = ["scope1", "scope2"]
- credentials, _ = _default.default(scopes=scopes)
- assert credentials._target_scopes == scopes
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test_default_impersonated_service_account_set_default_scopes(get_adc_path):
- get_adc_path.return_value = IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE
- default_scopes = ["scope1", "scope2"]
- credentials, _ = _default.default(default_scopes=default_scopes)
- assert credentials._target_scopes == default_scopes
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test_default_impersonated_service_account_set_both_scopes_and_default_scopes(
- get_adc_path
- ):
- get_adc_path.return_value = IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE
- scopes = ["scope1", "scope2"]
- default_scopes = ["scope3", "scope4"]
- credentials, _ = _default.default(scopes=scopes, default_scopes=default_scopes)
- assert credentials._target_scopes == scopes
- @EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
- def test_load_credentials_from_external_account_pluggable(get_project_id, tmpdir):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(PLUGGABLE_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
- assert isinstance(credentials, pluggable.Credentials)
- # Since no scopes are specified, the project ID cannot be determined.
- assert project_id is None
- assert get_project_id.called
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test_default_gdch_service_account_credentials(get_adc_path):
- get_adc_path.return_value = GDCH_SERVICE_ACCOUNT_FILE
- creds, project = _default.default(quota_project_id="project-foo")
- assert isinstance(creds, gdch_credentials.ServiceAccountCredentials)
- assert creds._service_identity_name == "service_identity_name"
- assert creds._audience is None
- assert creds._token_uri == "https://service-identity.<Domain>/authenticate"
- assert creds._ca_cert_path == "/path/to/ca/cert"
- assert project == "project_foo"
- @mock.patch.dict(os.environ)
- @mock.patch(
- "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
- )
- def test_quota_project_from_environment(get_adc_path):
- get_adc_path.return_value = AUTHORIZED_USER_CLOUD_SDK_WITH_QUOTA_PROJECT_ID_FILE
- credentials, _ = _default.default(quota_project_id=None)
- assert credentials.quota_project_id == "quota_project_id"
- quota_from_env = "quota_from_env"
- os.environ[environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT] = quota_from_env
- credentials, _ = _default.default(quota_project_id=None)
- assert credentials.quota_project_id == quota_from_env
- explicit_quota = "explicit_quota"
- credentials, _ = _default.default(quota_project_id=explicit_quota)
- assert credentials.quota_project_id == explicit_quota
- @mock.patch(
- "google.auth.compute_engine._metadata.is_on_gce", return_value=True, autospec=True
- )
- @mock.patch(
- "google.auth.compute_engine._metadata.get_project_id",
- return_value="example-project",
- autospec=True,
- )
- @mock.patch.dict(os.environ)
- def test_quota_gce_credentials(unused_get, unused_ping):
- # No quota
- credentials, project_id = _default._get_gce_credentials()
- assert project_id == "example-project"
- assert credentials.quota_project_id is None
- # Quota from environment
- quota_from_env = "quota_from_env"
- os.environ[environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT] = quota_from_env
- credentials, project_id = _default._get_gce_credentials()
- assert credentials.quota_project_id == quota_from_env
- # Explicit quota
- explicit_quota = "explicit_quota"
- credentials, project_id = _default._get_gce_credentials(
- quota_project_id=explicit_quota
- )
- assert credentials.quota_project_id == explicit_quota
|