test_external_account.py 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import json
  16. import mock
  17. import pytest
  18. from six.moves import http_client
  19. from six.moves import urllib
  20. from google.auth import _helpers
  21. from google.auth import exceptions
  22. from google.auth import external_account
  23. from google.auth import transport
  24. CLIENT_ID = "username"
  25. CLIENT_SECRET = "password"
  26. # Base64 encoding of "username:password"
  27. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  28. SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
  29. # List of valid workforce pool audiences.
  30. TEST_USER_AUDIENCES = [
  31. "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
  32. "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  33. "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
  34. ]
  35. class CredentialsImpl(external_account.Credentials):
  36. def __init__(
  37. self,
  38. audience,
  39. subject_token_type,
  40. token_url,
  41. credential_source,
  42. service_account_impersonation_url=None,
  43. client_id=None,
  44. client_secret=None,
  45. quota_project_id=None,
  46. scopes=None,
  47. default_scopes=None,
  48. ):
  49. super(CredentialsImpl, self).__init__(
  50. audience=audience,
  51. subject_token_type=subject_token_type,
  52. token_url=token_url,
  53. credential_source=credential_source,
  54. service_account_impersonation_url=service_account_impersonation_url,
  55. client_id=client_id,
  56. client_secret=client_secret,
  57. quota_project_id=quota_project_id,
  58. scopes=scopes,
  59. default_scopes=default_scopes,
  60. )
  61. self._counter = 0
  62. def retrieve_subject_token(self, request):
  63. counter = self._counter
  64. self._counter += 1
  65. return "subject_token_{}".format(counter)
  66. class TestCredentials(object):
  67. TOKEN_URL = "https://sts.googleapis.com/v1/token"
  68. PROJECT_NUMBER = "123456"
  69. POOL_ID = "POOL_ID"
  70. PROVIDER_ID = "PROVIDER_ID"
  71. AUDIENCE = (
  72. "//iam.googleapis.com/projects/{}"
  73. "/locations/global/workloadIdentityPools/{}"
  74. "/providers/{}"
  75. ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
  76. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  77. CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
  78. SUCCESS_RESPONSE = {
  79. "access_token": "ACCESS_TOKEN",
  80. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  81. "token_type": "Bearer",
  82. "expires_in": 3600,
  83. "scope": "scope1 scope2",
  84. }
  85. ERROR_RESPONSE = {
  86. "error": "invalid_request",
  87. "error_description": "Invalid subject token",
  88. "error_uri": "https://tools.ietf.org/html/rfc6749",
  89. }
  90. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  91. SERVICE_ACCOUNT_IMPERSONATION_URL = (
  92. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  93. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  94. )
  95. SCOPES = ["scope1", "scope2"]
  96. IMPERSONATION_ERROR_RESPONSE = {
  97. "error": {
  98. "code": 400,
  99. "message": "Request contains an invalid argument",
  100. "status": "INVALID_ARGUMENT",
  101. }
  102. }
  103. PROJECT_ID = "my-proj-id"
  104. CLOUD_RESOURCE_MANAGER_URL = (
  105. "https://cloudresourcemanager.googleapis.com/v1/projects/"
  106. )
  107. CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
  108. "projectNumber": PROJECT_NUMBER,
  109. "projectId": PROJECT_ID,
  110. "lifecycleState": "ACTIVE",
  111. "name": "project-name",
  112. "createTime": "2018-11-06T04:42:54.109Z",
  113. "parent": {"type": "folder", "id": "12345678901"},
  114. }
  115. @classmethod
  116. def make_credentials(
  117. cls,
  118. client_id=None,
  119. client_secret=None,
  120. quota_project_id=None,
  121. scopes=None,
  122. default_scopes=None,
  123. service_account_impersonation_url=None,
  124. ):
  125. return CredentialsImpl(
  126. audience=cls.AUDIENCE,
  127. subject_token_type=cls.SUBJECT_TOKEN_TYPE,
  128. token_url=cls.TOKEN_URL,
  129. service_account_impersonation_url=service_account_impersonation_url,
  130. credential_source=cls.CREDENTIAL_SOURCE,
  131. client_id=client_id,
  132. client_secret=client_secret,
  133. quota_project_id=quota_project_id,
  134. scopes=scopes,
  135. default_scopes=default_scopes,
  136. )
  137. @classmethod
  138. def make_mock_request(
  139. cls,
  140. status=http_client.OK,
  141. data=None,
  142. impersonation_status=None,
  143. impersonation_data=None,
  144. cloud_resource_manager_status=None,
  145. cloud_resource_manager_data=None,
  146. ):
  147. # STS token exchange request.
  148. token_response = mock.create_autospec(transport.Response, instance=True)
  149. token_response.status = status
  150. token_response.data = json.dumps(data).encode("utf-8")
  151. responses = [token_response]
  152. # If service account impersonation is requested, mock the expected response.
  153. if impersonation_status:
  154. impersonation_response = mock.create_autospec(
  155. transport.Response, instance=True
  156. )
  157. impersonation_response.status = impersonation_status
  158. impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
  159. responses.append(impersonation_response)
  160. # If cloud resource manager is requested, mock the expected response.
  161. if cloud_resource_manager_status:
  162. cloud_resource_manager_response = mock.create_autospec(
  163. transport.Response, instance=True
  164. )
  165. cloud_resource_manager_response.status = cloud_resource_manager_status
  166. cloud_resource_manager_response.data = json.dumps(
  167. cloud_resource_manager_data
  168. ).encode("utf-8")
  169. responses.append(cloud_resource_manager_response)
  170. request = mock.create_autospec(transport.Request)
  171. request.side_effect = responses
  172. return request
  173. @classmethod
  174. def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
  175. assert request_kwargs["url"] == cls.TOKEN_URL
  176. assert request_kwargs["method"] == "POST"
  177. assert request_kwargs["headers"] == headers
  178. assert request_kwargs["body"] is not None
  179. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  180. for (k, v) in body_tuples:
  181. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  182. assert len(body_tuples) == len(request_data.keys())
  183. @classmethod
  184. def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
  185. assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
  186. assert request_kwargs["method"] == "POST"
  187. assert request_kwargs["headers"] == headers
  188. assert request_kwargs["body"] is not None
  189. body_json = json.loads(request_kwargs["body"].decode("utf-8"))
  190. assert body_json == request_data
  191. @classmethod
  192. def assert_resource_manager_request_kwargs(
  193. cls, request_kwargs, project_number, headers
  194. ):
  195. assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
  196. assert request_kwargs["method"] == "GET"
  197. assert request_kwargs["headers"] == headers
  198. assert "body" not in request_kwargs
  199. def test_default_state(self):
  200. credentials = self.make_credentials()
  201. # Not token acquired yet
  202. assert not credentials.token
  203. assert not credentials.valid
  204. # Expiration hasn't been set yet
  205. assert not credentials.expiry
  206. assert not credentials.expired
  207. # Scopes are required
  208. assert not credentials.scopes
  209. assert credentials.requires_scopes
  210. assert not credentials.quota_project_id
  211. def test_with_scopes(self):
  212. credentials = self.make_credentials()
  213. assert not credentials.scopes
  214. assert credentials.requires_scopes
  215. scoped_credentials = credentials.with_scopes(["email"])
  216. assert scoped_credentials.has_scopes(["email"])
  217. assert not scoped_credentials.requires_scopes
  218. def test_with_scopes_using_user_and_default_scopes(self):
  219. credentials = self.make_credentials()
  220. assert not credentials.scopes
  221. assert credentials.requires_scopes
  222. scoped_credentials = credentials.with_scopes(
  223. ["email"], default_scopes=["profile"]
  224. )
  225. assert scoped_credentials.has_scopes(["email"])
  226. assert not scoped_credentials.has_scopes(["profile"])
  227. assert not scoped_credentials.requires_scopes
  228. assert scoped_credentials.scopes == ["email"]
  229. assert scoped_credentials.default_scopes == ["profile"]
  230. def test_with_scopes_using_default_scopes_only(self):
  231. credentials = self.make_credentials()
  232. assert not credentials.scopes
  233. assert credentials.requires_scopes
  234. scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
  235. assert scoped_credentials.has_scopes(["profile"])
  236. assert not scoped_credentials.requires_scopes
  237. def test_with_scopes_full_options_propagated(self):
  238. credentials = self.make_credentials(
  239. client_id=CLIENT_ID,
  240. client_secret=CLIENT_SECRET,
  241. quota_project_id=self.QUOTA_PROJECT_ID,
  242. scopes=self.SCOPES,
  243. default_scopes=["default1"],
  244. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  245. )
  246. with mock.patch.object(
  247. external_account.Credentials, "__init__", return_value=None
  248. ) as mock_init:
  249. credentials.with_scopes(["email"], ["default2"])
  250. # Confirm with_scopes initialized the credential with the expected
  251. # parameters and scopes.
  252. mock_init.assert_called_once_with(
  253. audience=self.AUDIENCE,
  254. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  255. token_url=self.TOKEN_URL,
  256. credential_source=self.CREDENTIAL_SOURCE,
  257. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  258. client_id=CLIENT_ID,
  259. client_secret=CLIENT_SECRET,
  260. quota_project_id=self.QUOTA_PROJECT_ID,
  261. scopes=["email"],
  262. default_scopes=["default2"],
  263. )
  264. def test_with_quota_project(self):
  265. credentials = self.make_credentials()
  266. assert not credentials.scopes
  267. assert not credentials.quota_project_id
  268. quota_project_creds = credentials.with_quota_project("project-foo")
  269. assert quota_project_creds.quota_project_id == "project-foo"
  270. def test_with_quota_project_full_options_propagated(self):
  271. credentials = self.make_credentials(
  272. client_id=CLIENT_ID,
  273. client_secret=CLIENT_SECRET,
  274. quota_project_id=self.QUOTA_PROJECT_ID,
  275. scopes=self.SCOPES,
  276. default_scopes=["default1"],
  277. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  278. )
  279. with mock.patch.object(
  280. external_account.Credentials, "__init__", return_value=None
  281. ) as mock_init:
  282. credentials.with_quota_project("project-foo")
  283. # Confirm with_quota_project initialized the credential with the
  284. # expected parameters and quota project ID.
  285. mock_init.assert_called_once_with(
  286. audience=self.AUDIENCE,
  287. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  288. token_url=self.TOKEN_URL,
  289. credential_source=self.CREDENTIAL_SOURCE,
  290. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  291. client_id=CLIENT_ID,
  292. client_secret=CLIENT_SECRET,
  293. quota_project_id="project-foo",
  294. scopes=self.SCOPES,
  295. default_scopes=["default1"],
  296. )
  297. def test_with_invalid_impersonation_target_principal(self):
  298. invalid_url = "https://iamcredentials.googleapis.com/v1/invalid"
  299. with pytest.raises(exceptions.RefreshError) as excinfo:
  300. self.make_credentials(service_account_impersonation_url=invalid_url)
  301. assert excinfo.match(
  302. r"Unable to determine target principal from service account impersonation URL."
  303. )
  304. def test_info(self):
  305. credentials = self.make_credentials()
  306. assert credentials.info == {
  307. "type": "external_account",
  308. "audience": self.AUDIENCE,
  309. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  310. "token_url": self.TOKEN_URL,
  311. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  312. }
  313. def test_info_with_full_options(self):
  314. credentials = self.make_credentials(
  315. client_id=CLIENT_ID,
  316. client_secret=CLIENT_SECRET,
  317. quota_project_id=self.QUOTA_PROJECT_ID,
  318. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  319. )
  320. assert credentials.info == {
  321. "type": "external_account",
  322. "audience": self.AUDIENCE,
  323. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  324. "token_url": self.TOKEN_URL,
  325. "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  326. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  327. "quota_project_id": self.QUOTA_PROJECT_ID,
  328. "client_id": CLIENT_ID,
  329. "client_secret": CLIENT_SECRET,
  330. }
  331. def test_service_account_email_without_impersonation(self):
  332. credentials = self.make_credentials()
  333. assert credentials.service_account_email is None
  334. def test_service_account_email_with_impersonation(self):
  335. credentials = self.make_credentials(
  336. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  337. )
  338. assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
  339. @pytest.mark.parametrize(
  340. "audience",
  341. # Workload identity pool audiences or invalid workforce pool audiences.
  342. [
  343. # Legacy K8s audience format.
  344. "identitynamespace:1f12345:my_provider",
  345. (
  346. "//iam.googleapis.com/projects/123456/locations/"
  347. "global/workloadIdentityPools/pool-id/providers/"
  348. "provider-id"
  349. ),
  350. (
  351. "//iam.googleapis.com/projects/123456/locations/"
  352. "eu/workloadIdentityPools/pool-id/providers/"
  353. "provider-id"
  354. ),
  355. # Pool ID with workforcePools string.
  356. (
  357. "//iam.googleapis.com/projects/123456/locations/"
  358. "global/workloadIdentityPools/workforcePools/providers/"
  359. "provider-id"
  360. ),
  361. # Unrealistic / incorrect workforce pool audiences.
  362. "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  363. "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
  364. "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
  365. "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
  366. "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
  367. ],
  368. )
  369. def test_is_user_with_non_users(self, audience):
  370. credentials = CredentialsImpl(
  371. audience=audience,
  372. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  373. token_url=self.TOKEN_URL,
  374. credential_source=self.CREDENTIAL_SOURCE,
  375. )
  376. assert credentials.is_user is False
  377. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  378. def test_is_user_with_users(self, audience):
  379. credentials = CredentialsImpl(
  380. audience=audience,
  381. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  382. token_url=self.TOKEN_URL,
  383. credential_source=self.CREDENTIAL_SOURCE,
  384. )
  385. assert credentials.is_user is True
  386. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  387. def test_is_user_with_users_and_impersonation(self, audience):
  388. # Initialize the credentials with service account impersonation.
  389. credentials = CredentialsImpl(
  390. audience=audience,
  391. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  392. token_url=self.TOKEN_URL,
  393. credential_source=self.CREDENTIAL_SOURCE,
  394. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  395. )
  396. # Even though the audience is for a workforce pool, since service account
  397. # impersonation is used, the credentials will represent a service account and
  398. # not a user.
  399. assert credentials.is_user is False
  400. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  401. def test_refresh_without_client_auth_success(self, unused_utcnow):
  402. response = self.SUCCESS_RESPONSE.copy()
  403. # Test custom expiration to confirm expiry is set correctly.
  404. response["expires_in"] = 2800
  405. expected_expiry = datetime.datetime.min + datetime.timedelta(
  406. seconds=response["expires_in"]
  407. )
  408. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  409. request_data = {
  410. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  411. "audience": self.AUDIENCE,
  412. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  413. "subject_token": "subject_token_0",
  414. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  415. }
  416. request = self.make_mock_request(status=http_client.OK, data=response)
  417. credentials = self.make_credentials()
  418. credentials.refresh(request)
  419. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  420. assert credentials.valid
  421. assert credentials.expiry == expected_expiry
  422. assert not credentials.expired
  423. assert credentials.token == response["access_token"]
  424. def test_refresh_impersonation_without_client_auth_success(self):
  425. # Simulate service account access token expires in 2800 seconds.
  426. expire_time = (
  427. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  428. ).isoformat("T") + "Z"
  429. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  430. # STS token exchange request/response.
  431. token_response = self.SUCCESS_RESPONSE.copy()
  432. token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
  433. token_request_data = {
  434. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  435. "audience": self.AUDIENCE,
  436. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  437. "subject_token": "subject_token_0",
  438. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  439. "scope": "https://www.googleapis.com/auth/iam",
  440. }
  441. # Service account impersonation request/response.
  442. impersonation_response = {
  443. "accessToken": "SA_ACCESS_TOKEN",
  444. "expireTime": expire_time,
  445. }
  446. impersonation_headers = {
  447. "Content-Type": "application/json",
  448. "authorization": "Bearer {}".format(token_response["access_token"]),
  449. }
  450. impersonation_request_data = {
  451. "delegates": None,
  452. "scope": self.SCOPES,
  453. "lifetime": "3600s",
  454. }
  455. # Initialize mock request to handle token exchange and service account
  456. # impersonation request.
  457. request = self.make_mock_request(
  458. status=http_client.OK,
  459. data=token_response,
  460. impersonation_status=http_client.OK,
  461. impersonation_data=impersonation_response,
  462. )
  463. # Initialize credentials with service account impersonation.
  464. credentials = self.make_credentials(
  465. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  466. scopes=self.SCOPES,
  467. )
  468. credentials.refresh(request)
  469. # Only 2 requests should be processed.
  470. assert len(request.call_args_list) == 2
  471. # Verify token exchange request parameters.
  472. self.assert_token_request_kwargs(
  473. request.call_args_list[0][1], token_headers, token_request_data
  474. )
  475. # Verify service account impersonation request parameters.
  476. self.assert_impersonation_request_kwargs(
  477. request.call_args_list[1][1],
  478. impersonation_headers,
  479. impersonation_request_data,
  480. )
  481. assert credentials.valid
  482. assert credentials.expiry == expected_expiry
  483. assert not credentials.expired
  484. assert credentials.token == impersonation_response["accessToken"]
  485. def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
  486. self,
  487. ):
  488. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  489. request_data = {
  490. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  491. "audience": self.AUDIENCE,
  492. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  493. "scope": "scope1 scope2",
  494. "subject_token": "subject_token_0",
  495. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  496. }
  497. request = self.make_mock_request(
  498. status=http_client.OK, data=self.SUCCESS_RESPONSE
  499. )
  500. credentials = self.make_credentials(
  501. scopes=["scope1", "scope2"],
  502. # Default scopes will be ignored in favor of user scopes.
  503. default_scopes=["ignored"],
  504. )
  505. credentials.refresh(request)
  506. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  507. assert credentials.valid
  508. assert not credentials.expired
  509. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  510. assert credentials.has_scopes(["scope1", "scope2"])
  511. assert not credentials.has_scopes(["ignored"])
  512. def test_refresh_without_client_auth_success_explicit_default_scopes_only(self):
  513. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  514. request_data = {
  515. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  516. "audience": self.AUDIENCE,
  517. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  518. "scope": "scope1 scope2",
  519. "subject_token": "subject_token_0",
  520. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  521. }
  522. request = self.make_mock_request(
  523. status=http_client.OK, data=self.SUCCESS_RESPONSE
  524. )
  525. credentials = self.make_credentials(
  526. scopes=None,
  527. # Default scopes will be used since user scopes are none.
  528. default_scopes=["scope1", "scope2"],
  529. )
  530. credentials.refresh(request)
  531. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  532. assert credentials.valid
  533. assert not credentials.expired
  534. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  535. assert credentials.has_scopes(["scope1", "scope2"])
  536. def test_refresh_without_client_auth_error(self):
  537. request = self.make_mock_request(
  538. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  539. )
  540. credentials = self.make_credentials()
  541. with pytest.raises(exceptions.OAuthError) as excinfo:
  542. credentials.refresh(request)
  543. assert excinfo.match(
  544. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  545. )
  546. assert not credentials.expired
  547. assert credentials.token is None
  548. def test_refresh_impersonation_without_client_auth_error(self):
  549. request = self.make_mock_request(
  550. status=http_client.OK,
  551. data=self.SUCCESS_RESPONSE,
  552. impersonation_status=http_client.BAD_REQUEST,
  553. impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
  554. )
  555. credentials = self.make_credentials(
  556. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  557. scopes=self.SCOPES,
  558. )
  559. with pytest.raises(exceptions.RefreshError) as excinfo:
  560. credentials.refresh(request)
  561. assert excinfo.match(r"Unable to acquire impersonated credentials")
  562. assert not credentials.expired
  563. assert credentials.token is None
  564. def test_refresh_with_client_auth_success(self):
  565. headers = {
  566. "Content-Type": "application/x-www-form-urlencoded",
  567. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  568. }
  569. request_data = {
  570. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  571. "audience": self.AUDIENCE,
  572. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  573. "subject_token": "subject_token_0",
  574. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  575. }
  576. request = self.make_mock_request(
  577. status=http_client.OK, data=self.SUCCESS_RESPONSE
  578. )
  579. credentials = self.make_credentials(
  580. client_id=CLIENT_ID, client_secret=CLIENT_SECRET
  581. )
  582. credentials.refresh(request)
  583. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  584. assert credentials.valid
  585. assert not credentials.expired
  586. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  587. def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(self):
  588. # Simulate service account access token expires in 2800 seconds.
  589. expire_time = (
  590. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  591. ).isoformat("T") + "Z"
  592. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  593. # STS token exchange request/response.
  594. token_response = self.SUCCESS_RESPONSE.copy()
  595. token_headers = {
  596. "Content-Type": "application/x-www-form-urlencoded",
  597. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  598. }
  599. token_request_data = {
  600. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  601. "audience": self.AUDIENCE,
  602. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  603. "subject_token": "subject_token_0",
  604. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  605. "scope": "https://www.googleapis.com/auth/iam",
  606. }
  607. # Service account impersonation request/response.
  608. impersonation_response = {
  609. "accessToken": "SA_ACCESS_TOKEN",
  610. "expireTime": expire_time,
  611. }
  612. impersonation_headers = {
  613. "Content-Type": "application/json",
  614. "authorization": "Bearer {}".format(token_response["access_token"]),
  615. }
  616. impersonation_request_data = {
  617. "delegates": None,
  618. "scope": self.SCOPES,
  619. "lifetime": "3600s",
  620. }
  621. # Initialize mock request to handle token exchange and service account
  622. # impersonation request.
  623. request = self.make_mock_request(
  624. status=http_client.OK,
  625. data=token_response,
  626. impersonation_status=http_client.OK,
  627. impersonation_data=impersonation_response,
  628. )
  629. # Initialize credentials with service account impersonation and basic auth.
  630. credentials = self.make_credentials(
  631. client_id=CLIENT_ID,
  632. client_secret=CLIENT_SECRET,
  633. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  634. scopes=self.SCOPES,
  635. # Default scopes will be ignored since user scopes are specified.
  636. default_scopes=["ignored"],
  637. )
  638. credentials.refresh(request)
  639. # Only 2 requests should be processed.
  640. assert len(request.call_args_list) == 2
  641. # Verify token exchange request parameters.
  642. self.assert_token_request_kwargs(
  643. request.call_args_list[0][1], token_headers, token_request_data
  644. )
  645. # Verify service account impersonation request parameters.
  646. self.assert_impersonation_request_kwargs(
  647. request.call_args_list[1][1],
  648. impersonation_headers,
  649. impersonation_request_data,
  650. )
  651. assert credentials.valid
  652. assert credentials.expiry == expected_expiry
  653. assert not credentials.expired
  654. assert credentials.token == impersonation_response["accessToken"]
  655. def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self):
  656. # Simulate service account access token expires in 2800 seconds.
  657. expire_time = (
  658. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  659. ).isoformat("T") + "Z"
  660. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  661. # STS token exchange request/response.
  662. token_response = self.SUCCESS_RESPONSE.copy()
  663. token_headers = {
  664. "Content-Type": "application/x-www-form-urlencoded",
  665. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  666. }
  667. token_request_data = {
  668. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  669. "audience": self.AUDIENCE,
  670. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  671. "subject_token": "subject_token_0",
  672. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  673. "scope": "https://www.googleapis.com/auth/iam",
  674. }
  675. # Service account impersonation request/response.
  676. impersonation_response = {
  677. "accessToken": "SA_ACCESS_TOKEN",
  678. "expireTime": expire_time,
  679. }
  680. impersonation_headers = {
  681. "Content-Type": "application/json",
  682. "authorization": "Bearer {}".format(token_response["access_token"]),
  683. }
  684. impersonation_request_data = {
  685. "delegates": None,
  686. "scope": self.SCOPES,
  687. "lifetime": "3600s",
  688. }
  689. # Initialize mock request to handle token exchange and service account
  690. # impersonation request.
  691. request = self.make_mock_request(
  692. status=http_client.OK,
  693. data=token_response,
  694. impersonation_status=http_client.OK,
  695. impersonation_data=impersonation_response,
  696. )
  697. # Initialize credentials with service account impersonation and basic auth.
  698. credentials = self.make_credentials(
  699. client_id=CLIENT_ID,
  700. client_secret=CLIENT_SECRET,
  701. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  702. scopes=None,
  703. # Default scopes will be used since user specified scopes are none.
  704. default_scopes=self.SCOPES,
  705. )
  706. credentials.refresh(request)
  707. # Only 2 requests should be processed.
  708. assert len(request.call_args_list) == 2
  709. # Verify token exchange request parameters.
  710. self.assert_token_request_kwargs(
  711. request.call_args_list[0][1], token_headers, token_request_data
  712. )
  713. # Verify service account impersonation request parameters.
  714. self.assert_impersonation_request_kwargs(
  715. request.call_args_list[1][1],
  716. impersonation_headers,
  717. impersonation_request_data,
  718. )
  719. assert credentials.valid
  720. assert credentials.expiry == expected_expiry
  721. assert not credentials.expired
  722. assert credentials.token == impersonation_response["accessToken"]
  723. def test_apply_without_quota_project_id(self):
  724. headers = {}
  725. request = self.make_mock_request(
  726. status=http_client.OK, data=self.SUCCESS_RESPONSE
  727. )
  728. credentials = self.make_credentials()
  729. credentials.refresh(request)
  730. credentials.apply(headers)
  731. assert headers == {
  732. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
  733. }
  734. def test_apply_impersonation_without_quota_project_id(self):
  735. expire_time = (
  736. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  737. ).isoformat("T") + "Z"
  738. # Service account impersonation response.
  739. impersonation_response = {
  740. "accessToken": "SA_ACCESS_TOKEN",
  741. "expireTime": expire_time,
  742. }
  743. # Initialize mock request to handle token exchange and service account
  744. # impersonation request.
  745. request = self.make_mock_request(
  746. status=http_client.OK,
  747. data=self.SUCCESS_RESPONSE.copy(),
  748. impersonation_status=http_client.OK,
  749. impersonation_data=impersonation_response,
  750. )
  751. # Initialize credentials with service account impersonation.
  752. credentials = self.make_credentials(
  753. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  754. scopes=self.SCOPES,
  755. )
  756. headers = {}
  757. credentials.refresh(request)
  758. credentials.apply(headers)
  759. assert headers == {
  760. "authorization": "Bearer {}".format(impersonation_response["accessToken"])
  761. }
  762. def test_apply_with_quota_project_id(self):
  763. headers = {"other": "header-value"}
  764. request = self.make_mock_request(
  765. status=http_client.OK, data=self.SUCCESS_RESPONSE
  766. )
  767. credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
  768. credentials.refresh(request)
  769. credentials.apply(headers)
  770. assert headers == {
  771. "other": "header-value",
  772. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  773. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  774. }
  775. def test_apply_impersonation_with_quota_project_id(self):
  776. expire_time = (
  777. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  778. ).isoformat("T") + "Z"
  779. # Service account impersonation response.
  780. impersonation_response = {
  781. "accessToken": "SA_ACCESS_TOKEN",
  782. "expireTime": expire_time,
  783. }
  784. # Initialize mock request to handle token exchange and service account
  785. # impersonation request.
  786. request = self.make_mock_request(
  787. status=http_client.OK,
  788. data=self.SUCCESS_RESPONSE.copy(),
  789. impersonation_status=http_client.OK,
  790. impersonation_data=impersonation_response,
  791. )
  792. # Initialize credentials with service account impersonation.
  793. credentials = self.make_credentials(
  794. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  795. scopes=self.SCOPES,
  796. quota_project_id=self.QUOTA_PROJECT_ID,
  797. )
  798. headers = {"other": "header-value"}
  799. credentials.refresh(request)
  800. credentials.apply(headers)
  801. assert headers == {
  802. "other": "header-value",
  803. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  804. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  805. }
  806. def test_before_request(self):
  807. headers = {"other": "header-value"}
  808. request = self.make_mock_request(
  809. status=http_client.OK, data=self.SUCCESS_RESPONSE
  810. )
  811. credentials = self.make_credentials()
  812. # First call should call refresh, setting the token.
  813. credentials.before_request(request, "POST", "https://example.com/api", headers)
  814. assert headers == {
  815. "other": "header-value",
  816. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  817. }
  818. # Second call shouldn't call refresh.
  819. credentials.before_request(request, "POST", "https://example.com/api", headers)
  820. assert headers == {
  821. "other": "header-value",
  822. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  823. }
  824. def test_before_request_impersonation(self):
  825. expire_time = (
  826. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  827. ).isoformat("T") + "Z"
  828. # Service account impersonation response.
  829. impersonation_response = {
  830. "accessToken": "SA_ACCESS_TOKEN",
  831. "expireTime": expire_time,
  832. }
  833. # Initialize mock request to handle token exchange and service account
  834. # impersonation request.
  835. request = self.make_mock_request(
  836. status=http_client.OK,
  837. data=self.SUCCESS_RESPONSE.copy(),
  838. impersonation_status=http_client.OK,
  839. impersonation_data=impersonation_response,
  840. )
  841. headers = {"other": "header-value"}
  842. credentials = self.make_credentials(
  843. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  844. )
  845. # First call should call refresh, setting the token.
  846. credentials.before_request(request, "POST", "https://example.com/api", headers)
  847. assert headers == {
  848. "other": "header-value",
  849. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  850. }
  851. # Second call shouldn't call refresh.
  852. credentials.before_request(request, "POST", "https://example.com/api", headers)
  853. assert headers == {
  854. "other": "header-value",
  855. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  856. }
  857. @mock.patch("google.auth._helpers.utcnow")
  858. def test_before_request_expired(self, utcnow):
  859. headers = {}
  860. request = self.make_mock_request(
  861. status=http_client.OK, data=self.SUCCESS_RESPONSE
  862. )
  863. credentials = self.make_credentials()
  864. credentials.token = "token"
  865. utcnow.return_value = datetime.datetime.min
  866. # Set the expiration to one second more than now plus the clock skew
  867. # accomodation. These credentials should be valid.
  868. credentials.expiry = (
  869. datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
  870. )
  871. assert credentials.valid
  872. assert not credentials.expired
  873. credentials.before_request(request, "POST", "https://example.com/api", headers)
  874. # Cached token should be used.
  875. assert headers == {"authorization": "Bearer token"}
  876. # Next call should simulate 1 second passed.
  877. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  878. assert not credentials.valid
  879. assert credentials.expired
  880. credentials.before_request(request, "POST", "https://example.com/api", headers)
  881. # New token should be retrieved.
  882. assert headers == {
  883. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
  884. }
  885. @mock.patch("google.auth._helpers.utcnow")
  886. def test_before_request_impersonation_expired(self, utcnow):
  887. headers = {}
  888. expire_time = (
  889. datetime.datetime.min + datetime.timedelta(seconds=3601)
  890. ).isoformat("T") + "Z"
  891. # Service account impersonation response.
  892. impersonation_response = {
  893. "accessToken": "SA_ACCESS_TOKEN",
  894. "expireTime": expire_time,
  895. }
  896. # Initialize mock request to handle token exchange and service account
  897. # impersonation request.
  898. request = self.make_mock_request(
  899. status=http_client.OK,
  900. data=self.SUCCESS_RESPONSE.copy(),
  901. impersonation_status=http_client.OK,
  902. impersonation_data=impersonation_response,
  903. )
  904. credentials = self.make_credentials(
  905. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  906. )
  907. credentials.token = "token"
  908. utcnow.return_value = datetime.datetime.min
  909. # Set the expiration to one second more than now plus the clock skew
  910. # accomodation. These credentials should be valid.
  911. credentials.expiry = (
  912. datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
  913. )
  914. assert credentials.valid
  915. assert not credentials.expired
  916. credentials.before_request(request, "POST", "https://example.com/api", headers)
  917. # Cached token should be used.
  918. assert headers == {"authorization": "Bearer token"}
  919. # Next call should simulate 1 second passed. This will trigger the expiration
  920. # threshold.
  921. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  922. assert not credentials.valid
  923. assert credentials.expired
  924. credentials.before_request(request, "POST", "https://example.com/api", headers)
  925. # New token should be retrieved.
  926. assert headers == {
  927. "authorization": "Bearer {}".format(impersonation_response["accessToken"])
  928. }
  929. @pytest.mark.parametrize(
  930. "audience",
  931. [
  932. # Legacy K8s audience format.
  933. "identitynamespace:1f12345:my_provider",
  934. # Unrealistic audiences.
  935. "//iam.googleapis.com/projects",
  936. "//iam.googleapis.com/projects/",
  937. "//iam.googleapis.com/project/123456",
  938. "//iam.googleapis.com/projects//123456",
  939. "//iam.googleapis.com/prefix_projects/123456",
  940. "//iam.googleapis.com/projects_suffix/123456",
  941. ],
  942. )
  943. def test_project_number_indeterminable(self, audience):
  944. credentials = CredentialsImpl(
  945. audience=audience,
  946. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  947. token_url=self.TOKEN_URL,
  948. credential_source=self.CREDENTIAL_SOURCE,
  949. )
  950. assert credentials.project_number is None
  951. assert credentials.get_project_id(None) is None
  952. def test_project_number_determinable(self):
  953. credentials = CredentialsImpl(
  954. audience=self.AUDIENCE,
  955. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  956. token_url=self.TOKEN_URL,
  957. credential_source=self.CREDENTIAL_SOURCE,
  958. )
  959. assert credentials.project_number == self.PROJECT_NUMBER
  960. def test_project_id_without_scopes(self):
  961. # Initialize credentials with no scopes.
  962. credentials = CredentialsImpl(
  963. audience=self.AUDIENCE,
  964. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  965. token_url=self.TOKEN_URL,
  966. credential_source=self.CREDENTIAL_SOURCE,
  967. )
  968. assert credentials.get_project_id(None) is None
  969. def test_get_project_id_cloud_resource_manager_success(self):
  970. # STS token exchange request/response.
  971. token_response = self.SUCCESS_RESPONSE.copy()
  972. token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
  973. token_request_data = {
  974. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  975. "audience": self.AUDIENCE,
  976. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  977. "subject_token": "subject_token_0",
  978. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  979. "scope": "https://www.googleapis.com/auth/iam",
  980. }
  981. # Service account impersonation request/response.
  982. expire_time = (
  983. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  984. ).isoformat("T") + "Z"
  985. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  986. impersonation_response = {
  987. "accessToken": "SA_ACCESS_TOKEN",
  988. "expireTime": expire_time,
  989. }
  990. impersonation_headers = {
  991. "Content-Type": "application/json",
  992. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  993. "authorization": "Bearer {}".format(token_response["access_token"]),
  994. }
  995. impersonation_request_data = {
  996. "delegates": None,
  997. "scope": self.SCOPES,
  998. "lifetime": "3600s",
  999. }
  1000. # Initialize mock request to handle token exchange, service account
  1001. # impersonation and cloud resource manager request.
  1002. request = self.make_mock_request(
  1003. status=http_client.OK,
  1004. data=self.SUCCESS_RESPONSE.copy(),
  1005. impersonation_status=http_client.OK,
  1006. impersonation_data=impersonation_response,
  1007. cloud_resource_manager_status=http_client.OK,
  1008. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1009. )
  1010. credentials = self.make_credentials(
  1011. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1012. scopes=self.SCOPES,
  1013. quota_project_id=self.QUOTA_PROJECT_ID,
  1014. )
  1015. # Expected project ID from cloud resource manager response should be returned.
  1016. project_id = credentials.get_project_id(request)
  1017. assert project_id == self.PROJECT_ID
  1018. # 3 requests should be processed.
  1019. assert len(request.call_args_list) == 3
  1020. # Verify token exchange request parameters.
  1021. self.assert_token_request_kwargs(
  1022. request.call_args_list[0][1], token_headers, token_request_data
  1023. )
  1024. # Verify service account impersonation request parameters.
  1025. self.assert_impersonation_request_kwargs(
  1026. request.call_args_list[1][1],
  1027. impersonation_headers,
  1028. impersonation_request_data,
  1029. )
  1030. # In the process of getting project ID, an access token should be
  1031. # retrieved.
  1032. assert credentials.valid
  1033. assert credentials.expiry == expected_expiry
  1034. assert not credentials.expired
  1035. assert credentials.token == impersonation_response["accessToken"]
  1036. # Verify cloud resource manager request parameters.
  1037. self.assert_resource_manager_request_kwargs(
  1038. request.call_args_list[2][1],
  1039. self.PROJECT_NUMBER,
  1040. {
  1041. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1042. "authorization": "Bearer {}".format(
  1043. impersonation_response["accessToken"]
  1044. ),
  1045. },
  1046. )
  1047. # Calling get_project_id again should return the cached project_id.
  1048. project_id = credentials.get_project_id(request)
  1049. assert project_id == self.PROJECT_ID
  1050. # No additional requests.
  1051. assert len(request.call_args_list) == 3
  1052. def test_get_project_id_cloud_resource_manager_error(self):
  1053. # Simulate resource doesn't have sufficient permissions to access
  1054. # cloud resource manager.
  1055. request = self.make_mock_request(
  1056. status=http_client.OK,
  1057. data=self.SUCCESS_RESPONSE.copy(),
  1058. cloud_resource_manager_status=http_client.UNAUTHORIZED,
  1059. )
  1060. credentials = self.make_credentials(scopes=self.SCOPES)
  1061. project_id = credentials.get_project_id(request)
  1062. assert project_id is None
  1063. # Only 2 requests to STS and cloud resource manager should be sent.
  1064. assert len(request.call_args_list) == 2