test_impersonated_credentials.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. # Copyright 2018 Google Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import os
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import crypt
  22. from google.auth import exceptions
  23. from google.auth import impersonated_credentials
  24. from google.auth import transport
  25. from google.auth.impersonated_credentials import Credentials
  26. from google.oauth2 import credentials
  27. from google.oauth2 import service_account
  28. import yatest.common as yc
  29. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
  30. with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
  31. PRIVATE_KEY_BYTES = fh.read()
  32. SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
  33. ID_TOKEN_DATA = (
  34. "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew"
  35. "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc"
  36. "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle"
  37. "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L"
  38. "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN"
  39. "zA4NTY4In0.redacted"
  40. )
  41. ID_TOKEN_EXPIRY = 1564475051
  42. with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
  43. SERVICE_ACCOUNT_INFO = json.load(fh)
  44. SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
  45. TOKEN_URI = "https://example.com/oauth2/token"
  46. ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  47. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  48. )
  49. ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  50. "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/imp"
  51. )
  52. @pytest.fixture
  53. def mock_donor_credentials():
  54. with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant:
  55. grant.return_value = (
  56. "source token",
  57. _helpers.utcnow() + datetime.timedelta(seconds=500),
  58. {},
  59. )
  60. yield grant
  61. class MockResponse:
  62. def __init__(self, json_data, status_code):
  63. self.json_data = json_data
  64. self.status_code = status_code
  65. def json(self):
  66. return self.json_data
  67. @pytest.fixture
  68. def mock_authorizedsession_sign():
  69. with mock.patch(
  70. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  71. ) as auth_session:
  72. data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"}
  73. auth_session.return_value = MockResponse(data, http_client.OK)
  74. yield auth_session
  75. @pytest.fixture
  76. def mock_authorizedsession_idtoken():
  77. with mock.patch(
  78. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  79. ) as auth_session:
  80. data = {"token": ID_TOKEN_DATA}
  81. auth_session.return_value = MockResponse(data, http_client.OK)
  82. yield auth_session
  83. class TestImpersonatedCredentials(object):
  84. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  85. TARGET_PRINCIPAL = "impersonated@project.iam.gserviceaccount.com"
  86. TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"]
  87. # DELEGATES: List[str] = []
  88. # Because Python 2.7:
  89. DELEGATES = [] # type: ignore
  90. LIFETIME = 3600
  91. SOURCE_CREDENTIALS = service_account.Credentials(
  92. SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
  93. )
  94. USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE")
  95. IAM_ENDPOINT_OVERRIDE = (
  96. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  97. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  98. )
  99. def make_credentials(
  100. self,
  101. source_credentials=SOURCE_CREDENTIALS,
  102. lifetime=LIFETIME,
  103. target_principal=TARGET_PRINCIPAL,
  104. iam_endpoint_override=None,
  105. ):
  106. return Credentials(
  107. source_credentials=source_credentials,
  108. target_principal=target_principal,
  109. target_scopes=self.TARGET_SCOPES,
  110. delegates=self.DELEGATES,
  111. lifetime=lifetime,
  112. iam_endpoint_override=iam_endpoint_override,
  113. )
  114. def test_make_from_user_credentials(self):
  115. credentials = self.make_credentials(
  116. source_credentials=self.USER_SOURCE_CREDENTIALS
  117. )
  118. assert not credentials.valid
  119. assert credentials.expired
  120. def test_default_state(self):
  121. credentials = self.make_credentials()
  122. assert not credentials.valid
  123. assert credentials.expired
  124. def test_make_from_service_account_self_signed_jwt(self):
  125. source_credentials = service_account.Credentials(
  126. SIGNER, self.SERVICE_ACCOUNT_EMAIL, TOKEN_URI, always_use_jwt_access=True
  127. )
  128. credentials = self.make_credentials(source_credentials=source_credentials)
  129. # test the source credential don't lose self signed jwt setting
  130. assert credentials._source_credentials._always_use_jwt_access
  131. assert credentials._source_credentials._jwt_credentials
  132. def make_request(
  133. self,
  134. data,
  135. status=http_client.OK,
  136. headers=None,
  137. side_effect=None,
  138. use_data_bytes=True,
  139. ):
  140. response = mock.create_autospec(transport.Response, instance=False)
  141. response.status = status
  142. response.data = _helpers.to_bytes(data) if use_data_bytes else data
  143. response.headers = headers or {}
  144. request = mock.create_autospec(transport.Request, instance=False)
  145. request.side_effect = side_effect
  146. request.return_value = response
  147. return request
  148. def test_token_usage_metrics(self):
  149. credentials = self.make_credentials()
  150. credentials.token = "token"
  151. credentials.expiry = None
  152. headers = {}
  153. credentials.before_request(mock.Mock(), None, None, headers)
  154. assert headers["authorization"] == "Bearer token"
  155. assert headers["x-goog-api-client"] == "cred-type/imp"
  156. @pytest.mark.parametrize("use_data_bytes", [True, False])
  157. def test_refresh_success(self, use_data_bytes, mock_donor_credentials):
  158. credentials = self.make_credentials(lifetime=None)
  159. token = "token"
  160. expire_time = (
  161. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  162. ).isoformat("T") + "Z"
  163. response_body = {"accessToken": token, "expireTime": expire_time}
  164. request = self.make_request(
  165. data=json.dumps(response_body),
  166. status=http_client.OK,
  167. use_data_bytes=use_data_bytes,
  168. )
  169. with mock.patch(
  170. "google.auth.metrics.token_request_access_token_impersonate",
  171. return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  172. ):
  173. credentials.refresh(request)
  174. assert credentials.valid
  175. assert not credentials.expired
  176. assert (
  177. request.call_args.kwargs["headers"]["x-goog-api-client"]
  178. == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE
  179. )
  180. @pytest.mark.parametrize("use_data_bytes", [True, False])
  181. def test_refresh_success_iam_endpoint_override(
  182. self, use_data_bytes, mock_donor_credentials
  183. ):
  184. credentials = self.make_credentials(
  185. lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE
  186. )
  187. token = "token"
  188. expire_time = (
  189. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  190. ).isoformat("T") + "Z"
  191. response_body = {"accessToken": token, "expireTime": expire_time}
  192. request = self.make_request(
  193. data=json.dumps(response_body),
  194. status=http_client.OK,
  195. use_data_bytes=use_data_bytes,
  196. )
  197. credentials.refresh(request)
  198. assert credentials.valid
  199. assert not credentials.expired
  200. # Confirm override endpoint used.
  201. request_kwargs = request.call_args[1]
  202. assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
  203. @pytest.mark.parametrize("time_skew", [150, -150])
  204. def test_refresh_source_credentials(self, time_skew):
  205. credentials = self.make_credentials(lifetime=None)
  206. # Source credentials is refreshed only if it is expired within
  207. # _helpers.REFRESH_THRESHOLD from now. We add a time_skew to the expiry, so
  208. # source credentials is refreshed only if time_skew <= 0.
  209. credentials._source_credentials.expiry = (
  210. _helpers.utcnow()
  211. + _helpers.REFRESH_THRESHOLD
  212. + datetime.timedelta(seconds=time_skew)
  213. )
  214. credentials._source_credentials.token = "Token"
  215. with mock.patch(
  216. "google.oauth2.service_account.Credentials.refresh", autospec=True
  217. ) as source_cred_refresh:
  218. expire_time = (
  219. _helpers.utcnow().replace(microsecond=0)
  220. + datetime.timedelta(seconds=500)
  221. ).isoformat("T") + "Z"
  222. response_body = {"accessToken": "token", "expireTime": expire_time}
  223. request = self.make_request(
  224. data=json.dumps(response_body), status=http_client.OK
  225. )
  226. credentials.refresh(request)
  227. assert credentials.valid
  228. assert not credentials.expired
  229. # Source credentials is refreshed only if it is expired within
  230. # _helpers.REFRESH_THRESHOLD
  231. if time_skew > 0:
  232. source_cred_refresh.assert_not_called()
  233. else:
  234. source_cred_refresh.assert_called_once()
  235. def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
  236. credentials = self.make_credentials(lifetime=None)
  237. token = "token"
  238. expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat(
  239. "T"
  240. )
  241. response_body = {"accessToken": token, "expireTime": expire_time}
  242. request = self.make_request(
  243. data=json.dumps(response_body), status=http_client.OK
  244. )
  245. with pytest.raises(exceptions.RefreshError) as excinfo:
  246. credentials.refresh(request)
  247. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  248. assert not credentials.valid
  249. assert credentials.expired
  250. def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
  251. credentials = self.make_credentials(lifetime=None)
  252. response_body = {
  253. "error": {
  254. "code": 403,
  255. "message": "The caller does not have permission",
  256. "status": "PERMISSION_DENIED",
  257. }
  258. }
  259. request = self.make_request(
  260. data=json.dumps(response_body), status=http_client.UNAUTHORIZED
  261. )
  262. with pytest.raises(exceptions.RefreshError) as excinfo:
  263. credentials.refresh(request)
  264. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  265. assert not credentials.valid
  266. assert credentials.expired
  267. def test_refresh_failure(self):
  268. credentials = self.make_credentials(lifetime=None)
  269. credentials.expiry = None
  270. credentials.token = "token"
  271. id_creds = impersonated_credentials.IDTokenCredentials(
  272. credentials, target_audience="audience"
  273. )
  274. response = mock.create_autospec(transport.Response, instance=False)
  275. response.status_code = http_client.UNAUTHORIZED
  276. response.json = mock.Mock(return_value="failed to get ID token")
  277. with mock.patch(
  278. "google.auth.transport.requests.AuthorizedSession.post",
  279. return_value=response,
  280. ):
  281. with pytest.raises(exceptions.RefreshError) as excinfo:
  282. id_creds.refresh(None)
  283. assert excinfo.match("Error getting ID token")
  284. def test_refresh_failure_http_error(self, mock_donor_credentials):
  285. credentials = self.make_credentials(lifetime=None)
  286. response_body = {}
  287. request = self.make_request(
  288. data=json.dumps(response_body), status=http_client.HTTPException
  289. )
  290. with pytest.raises(exceptions.RefreshError) as excinfo:
  291. credentials.refresh(request)
  292. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  293. assert not credentials.valid
  294. assert credentials.expired
  295. def test_expired(self):
  296. credentials = self.make_credentials(lifetime=None)
  297. assert credentials.expired
  298. def test_signer(self):
  299. credentials = self.make_credentials()
  300. assert isinstance(credentials.signer, impersonated_credentials.Credentials)
  301. def test_signer_email(self):
  302. credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
  303. assert credentials.signer_email == self.TARGET_PRINCIPAL
  304. def test_service_account_email(self):
  305. credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
  306. assert credentials.service_account_email == self.TARGET_PRINCIPAL
  307. def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign):
  308. credentials = self.make_credentials(lifetime=None)
  309. token = "token"
  310. expire_time = (
  311. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  312. ).isoformat("T") + "Z"
  313. token_response_body = {"accessToken": token, "expireTime": expire_time}
  314. response = mock.create_autospec(transport.Response, instance=False)
  315. response.status = http_client.OK
  316. response.data = _helpers.to_bytes(json.dumps(token_response_body))
  317. request = mock.create_autospec(transport.Request, instance=False)
  318. request.return_value = response
  319. credentials.refresh(request)
  320. assert credentials.valid
  321. assert not credentials.expired
  322. signature = credentials.sign_bytes(b"signed bytes")
  323. assert signature == b"signature"
  324. def test_sign_bytes_failure(self):
  325. credentials = self.make_credentials(lifetime=None)
  326. with mock.patch(
  327. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  328. ) as auth_session:
  329. data = {"error": {"code": 403, "message": "unauthorized"}}
  330. auth_session.return_value = MockResponse(data, http_client.FORBIDDEN)
  331. with pytest.raises(exceptions.TransportError) as excinfo:
  332. credentials.sign_bytes(b"foo")
  333. assert excinfo.match("'code': 403")
  334. def test_with_quota_project(self):
  335. credentials = self.make_credentials()
  336. quota_project_creds = credentials.with_quota_project("project-foo")
  337. assert quota_project_creds._quota_project_id == "project-foo"
  338. @pytest.mark.parametrize("use_data_bytes", [True, False])
  339. def test_with_quota_project_iam_endpoint_override(
  340. self, use_data_bytes, mock_donor_credentials
  341. ):
  342. credentials = self.make_credentials(
  343. lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE
  344. )
  345. token = "token"
  346. # iam_endpoint_override should be copied to created credentials.
  347. quota_project_creds = credentials.with_quota_project("project-foo")
  348. expire_time = (
  349. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  350. ).isoformat("T") + "Z"
  351. response_body = {"accessToken": token, "expireTime": expire_time}
  352. request = self.make_request(
  353. data=json.dumps(response_body),
  354. status=http_client.OK,
  355. use_data_bytes=use_data_bytes,
  356. )
  357. quota_project_creds.refresh(request)
  358. assert quota_project_creds.valid
  359. assert not quota_project_creds.expired
  360. # Confirm override endpoint used.
  361. request_kwargs = request.call_args[1]
  362. assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
  363. def test_with_scopes(self):
  364. credentials = self.make_credentials()
  365. credentials._target_scopes = []
  366. assert credentials.requires_scopes is True
  367. credentials = credentials.with_scopes(["fake_scope1", "fake_scope2"])
  368. assert credentials.requires_scopes is False
  369. assert credentials._target_scopes == ["fake_scope1", "fake_scope2"]
  370. def test_with_scopes_provide_default_scopes(self):
  371. credentials = self.make_credentials()
  372. credentials._target_scopes = []
  373. credentials = credentials.with_scopes(
  374. ["fake_scope1"], default_scopes=["fake_scope2"]
  375. )
  376. assert credentials._target_scopes == ["fake_scope1"]
  377. def test_id_token_success(
  378. self, mock_donor_credentials, mock_authorizedsession_idtoken
  379. ):
  380. credentials = self.make_credentials(lifetime=None)
  381. token = "token"
  382. target_audience = "https://foo.bar"
  383. expire_time = (
  384. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  385. ).isoformat("T") + "Z"
  386. response_body = {"accessToken": token, "expireTime": expire_time}
  387. request = self.make_request(
  388. data=json.dumps(response_body), status=http_client.OK
  389. )
  390. credentials.refresh(request)
  391. assert credentials.valid
  392. assert not credentials.expired
  393. id_creds = impersonated_credentials.IDTokenCredentials(
  394. credentials, target_audience=target_audience
  395. )
  396. id_creds.refresh(request)
  397. assert id_creds.token == ID_TOKEN_DATA
  398. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(ID_TOKEN_EXPIRY)
  399. def test_id_token_metrics(self, mock_donor_credentials):
  400. credentials = self.make_credentials(lifetime=None)
  401. credentials.token = "token"
  402. credentials.expiry = None
  403. target_audience = "https://foo.bar"
  404. id_creds = impersonated_credentials.IDTokenCredentials(
  405. credentials, target_audience=target_audience
  406. )
  407. with mock.patch(
  408. "google.auth.metrics.token_request_id_token_impersonate",
  409. return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  410. ):
  411. with mock.patch(
  412. "google.auth.transport.requests.AuthorizedSession.post", autospec=True
  413. ) as mock_post:
  414. data = {"token": ID_TOKEN_DATA}
  415. mock_post.return_value = MockResponse(data, http_client.OK)
  416. id_creds.refresh(None)
  417. assert id_creds.token == ID_TOKEN_DATA
  418. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(
  419. ID_TOKEN_EXPIRY
  420. )
  421. assert (
  422. mock_post.call_args.kwargs["headers"]["x-goog-api-client"]
  423. == ID_TOKEN_REQUEST_METRICS_HEADER_VALUE
  424. )
  425. def test_id_token_from_credential(
  426. self, mock_donor_credentials, mock_authorizedsession_idtoken
  427. ):
  428. credentials = self.make_credentials(lifetime=None)
  429. token = "token"
  430. target_audience = "https://foo.bar"
  431. expire_time = (
  432. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  433. ).isoformat("T") + "Z"
  434. response_body = {"accessToken": token, "expireTime": expire_time}
  435. request = self.make_request(
  436. data=json.dumps(response_body), status=http_client.OK
  437. )
  438. credentials.refresh(request)
  439. assert credentials.valid
  440. assert not credentials.expired
  441. new_credentials = self.make_credentials(lifetime=None)
  442. id_creds = impersonated_credentials.IDTokenCredentials(
  443. credentials, target_audience=target_audience, include_email=True
  444. )
  445. id_creds = id_creds.from_credentials(target_credentials=new_credentials)
  446. id_creds.refresh(request)
  447. assert id_creds.token == ID_TOKEN_DATA
  448. assert id_creds._include_email is True
  449. assert id_creds._target_credentials is new_credentials
  450. def test_id_token_with_target_audience(
  451. self, mock_donor_credentials, mock_authorizedsession_idtoken
  452. ):
  453. credentials = self.make_credentials(lifetime=None)
  454. token = "token"
  455. target_audience = "https://foo.bar"
  456. expire_time = (
  457. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  458. ).isoformat("T") + "Z"
  459. response_body = {"accessToken": token, "expireTime": expire_time}
  460. request = self.make_request(
  461. data=json.dumps(response_body), status=http_client.OK
  462. )
  463. credentials.refresh(request)
  464. assert credentials.valid
  465. assert not credentials.expired
  466. id_creds = impersonated_credentials.IDTokenCredentials(
  467. credentials, include_email=True
  468. )
  469. id_creds = id_creds.with_target_audience(target_audience=target_audience)
  470. id_creds.refresh(request)
  471. assert id_creds.token == ID_TOKEN_DATA
  472. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(ID_TOKEN_EXPIRY)
  473. assert id_creds._include_email is True
  474. def test_id_token_invalid_cred(
  475. self, mock_donor_credentials, mock_authorizedsession_idtoken
  476. ):
  477. credentials = None
  478. with pytest.raises(exceptions.GoogleAuthError) as excinfo:
  479. impersonated_credentials.IDTokenCredentials(credentials)
  480. assert excinfo.match("Provided Credential must be" " impersonated_credentials")
  481. def test_id_token_with_include_email(
  482. self, mock_donor_credentials, mock_authorizedsession_idtoken
  483. ):
  484. credentials = self.make_credentials(lifetime=None)
  485. token = "token"
  486. target_audience = "https://foo.bar"
  487. expire_time = (
  488. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  489. ).isoformat("T") + "Z"
  490. response_body = {"accessToken": token, "expireTime": expire_time}
  491. request = self.make_request(
  492. data=json.dumps(response_body), status=http_client.OK
  493. )
  494. credentials.refresh(request)
  495. assert credentials.valid
  496. assert not credentials.expired
  497. id_creds = impersonated_credentials.IDTokenCredentials(
  498. credentials, target_audience=target_audience
  499. )
  500. id_creds = id_creds.with_include_email(True)
  501. id_creds.refresh(request)
  502. assert id_creds.token == ID_TOKEN_DATA
  503. def test_id_token_with_quota_project(
  504. self, mock_donor_credentials, mock_authorizedsession_idtoken
  505. ):
  506. credentials = self.make_credentials(lifetime=None)
  507. token = "token"
  508. target_audience = "https://foo.bar"
  509. expire_time = (
  510. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  511. ).isoformat("T") + "Z"
  512. response_body = {"accessToken": token, "expireTime": expire_time}
  513. request = self.make_request(
  514. data=json.dumps(response_body), status=http_client.OK
  515. )
  516. credentials.refresh(request)
  517. assert credentials.valid
  518. assert not credentials.expired
  519. id_creds = impersonated_credentials.IDTokenCredentials(
  520. credentials, target_audience=target_audience
  521. )
  522. id_creds = id_creds.with_quota_project("project-foo")
  523. id_creds.refresh(request)
  524. assert id_creds.quota_project_id == "project-foo"