test_impersonated_credentials.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813
  1. # Copyright 2018 Google Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import os
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import crypt
  22. from google.auth import exceptions
  23. from google.auth import impersonated_credentials
  24. from google.auth import transport
  25. from google.auth.impersonated_credentials import Credentials
  26. from google.oauth2 import credentials
  27. from google.oauth2 import service_account
  28. import yatest.common as yc
  29. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
  30. with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
  31. PRIVATE_KEY_BYTES = fh.read()
  32. SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
  33. ID_TOKEN_DATA = (
  34. "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew"
  35. "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc"
  36. "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle"
  37. "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L"
  38. "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN"
  39. "zA4NTY4In0.redacted"
  40. )
  41. ID_TOKEN_EXPIRY = 1564475051
  42. with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
  43. SERVICE_ACCOUNT_INFO = json.load(fh)
  44. SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
  45. TOKEN_URI = "https://example.com/oauth2/token"
  46. ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  47. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  48. )
  49. ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  50. "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/imp"
  51. )
  52. @pytest.fixture
  53. def mock_donor_credentials():
  54. with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant:
  55. grant.return_value = (
  56. "source token",
  57. _helpers.utcnow() + datetime.timedelta(seconds=500),
  58. {},
  59. )
  60. yield grant
  61. class MockResponse:
  62. def __init__(self, json_data, status_code):
  63. self.json_data = json_data
  64. self.status_code = status_code
  65. def json(self):
  66. return self.json_data
  67. @pytest.fixture
  68. def mock_authorizedsession_sign():
  69. with mock.patch(
  70. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  71. ) as auth_session:
  72. data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"}
  73. auth_session.return_value = MockResponse(data, http_client.OK)
  74. yield auth_session
  75. @pytest.fixture
  76. def mock_authorizedsession_idtoken():
  77. with mock.patch(
  78. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  79. ) as auth_session:
  80. data = {"token": ID_TOKEN_DATA}
  81. auth_session.return_value = MockResponse(data, http_client.OK)
  82. yield auth_session
  83. class TestImpersonatedCredentials(object):
  84. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  85. TARGET_PRINCIPAL = "impersonated@project.iam.gserviceaccount.com"
  86. TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"]
  87. # DELEGATES: List[str] = []
  88. # Because Python 2.7:
  89. DELEGATES = [] # type: ignore
  90. LIFETIME = 3600
  91. SOURCE_CREDENTIALS = service_account.Credentials(
  92. SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
  93. )
  94. USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE")
  95. IAM_ENDPOINT_OVERRIDE = (
  96. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  97. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  98. )
  99. def make_credentials(
  100. self,
  101. source_credentials=SOURCE_CREDENTIALS,
  102. lifetime=LIFETIME,
  103. target_principal=TARGET_PRINCIPAL,
  104. iam_endpoint_override=None,
  105. ):
  106. return Credentials(
  107. source_credentials=source_credentials,
  108. target_principal=target_principal,
  109. target_scopes=self.TARGET_SCOPES,
  110. delegates=self.DELEGATES,
  111. lifetime=lifetime,
  112. iam_endpoint_override=iam_endpoint_override,
  113. )
  114. def test_get_cred_info(self):
  115. credentials = self.make_credentials()
  116. assert not credentials.get_cred_info()
  117. credentials._cred_file_path = "/path/to/file"
  118. assert credentials.get_cred_info() == {
  119. "credential_source": "/path/to/file",
  120. "credential_type": "impersonated credentials",
  121. "principal": "impersonated@project.iam.gserviceaccount.com",
  122. }
  123. def test_universe_domain_matching_source(self):
  124. source_credentials = service_account.Credentials(
  125. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  126. )
  127. credentials = self.make_credentials(source_credentials=source_credentials)
  128. assert credentials.universe_domain == "foo.bar"
  129. def test__make_copy_get_cred_info(self):
  130. credentials = self.make_credentials()
  131. credentials._cred_file_path = "/path/to/file"
  132. cred_copy = credentials._make_copy()
  133. assert cred_copy._cred_file_path == "/path/to/file"
  134. def test_make_from_user_credentials(self):
  135. credentials = self.make_credentials(
  136. source_credentials=self.USER_SOURCE_CREDENTIALS
  137. )
  138. assert not credentials.valid
  139. assert credentials.expired
  140. def test_default_state(self):
  141. credentials = self.make_credentials()
  142. assert not credentials.valid
  143. assert credentials.expired
  144. def test_make_from_service_account_self_signed_jwt(self):
  145. source_credentials = service_account.Credentials(
  146. SIGNER, self.SERVICE_ACCOUNT_EMAIL, TOKEN_URI, always_use_jwt_access=True
  147. )
  148. credentials = self.make_credentials(source_credentials=source_credentials)
  149. # test the source credential don't lose self signed jwt setting
  150. assert credentials._source_credentials._always_use_jwt_access
  151. assert credentials._source_credentials._jwt_credentials
  152. def make_request(
  153. self,
  154. data,
  155. status=http_client.OK,
  156. headers=None,
  157. side_effect=None,
  158. use_data_bytes=True,
  159. ):
  160. response = mock.create_autospec(transport.Response, instance=False)
  161. response.status = status
  162. response.data = _helpers.to_bytes(data) if use_data_bytes else data
  163. response.headers = headers or {}
  164. request = mock.create_autospec(transport.Request, instance=False)
  165. request.side_effect = side_effect
  166. request.return_value = response
  167. return request
  168. def test_token_usage_metrics(self):
  169. credentials = self.make_credentials()
  170. credentials.token = "token"
  171. credentials.expiry = None
  172. headers = {}
  173. credentials.before_request(mock.Mock(), None, None, headers)
  174. assert headers["authorization"] == "Bearer token"
  175. assert headers["x-goog-api-client"] == "cred-type/imp"
  176. @pytest.mark.parametrize("use_data_bytes", [True, False])
  177. def test_refresh_success(self, use_data_bytes, mock_donor_credentials):
  178. credentials = self.make_credentials(lifetime=None)
  179. token = "token"
  180. expire_time = (
  181. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  182. ).isoformat("T") + "Z"
  183. response_body = {"accessToken": token, "expireTime": expire_time}
  184. request = self.make_request(
  185. data=json.dumps(response_body),
  186. status=http_client.OK,
  187. use_data_bytes=use_data_bytes,
  188. )
  189. with mock.patch(
  190. "google.auth.metrics.token_request_access_token_impersonate",
  191. return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  192. ):
  193. credentials.refresh(request)
  194. assert credentials.valid
  195. assert not credentials.expired
  196. assert (
  197. request.call_args.kwargs["headers"]["x-goog-api-client"]
  198. == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE
  199. )
  200. @pytest.mark.parametrize("use_data_bytes", [True, False])
  201. def test_refresh_success_nonGdu(self, use_data_bytes, mock_donor_credentials):
  202. source_credentials = service_account.Credentials(
  203. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  204. )
  205. credentials = self.make_credentials(
  206. lifetime=None, source_credentials=source_credentials
  207. )
  208. token = "token"
  209. expire_time = (
  210. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  211. ).isoformat("T") + "Z"
  212. response_body = {"accessToken": token, "expireTime": expire_time}
  213. request = self.make_request(
  214. data=json.dumps(response_body),
  215. status=http_client.OK,
  216. use_data_bytes=use_data_bytes,
  217. )
  218. credentials.refresh(request)
  219. assert credentials.valid
  220. assert not credentials.expired
  221. # Confirm override endpoint used.
  222. request_kwargs = request.call_args[1]
  223. assert (
  224. request_kwargs["url"]
  225. == "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateAccessToken"
  226. )
  227. @pytest.mark.parametrize("use_data_bytes", [True, False])
  228. def test_refresh_success_iam_endpoint_override(
  229. self, use_data_bytes, mock_donor_credentials
  230. ):
  231. credentials = self.make_credentials(
  232. lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE
  233. )
  234. token = "token"
  235. expire_time = (
  236. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  237. ).isoformat("T") + "Z"
  238. response_body = {"accessToken": token, "expireTime": expire_time}
  239. request = self.make_request(
  240. data=json.dumps(response_body),
  241. status=http_client.OK,
  242. use_data_bytes=use_data_bytes,
  243. )
  244. credentials.refresh(request)
  245. assert credentials.valid
  246. assert not credentials.expired
  247. # Confirm override endpoint used.
  248. request_kwargs = request.call_args[1]
  249. assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
  250. @pytest.mark.parametrize("time_skew", [150, -150])
  251. def test_refresh_source_credentials(self, time_skew):
  252. credentials = self.make_credentials(lifetime=None)
  253. # Source credentials is refreshed only if it is expired within
  254. # _helpers.REFRESH_THRESHOLD from now. We add a time_skew to the expiry, so
  255. # source credentials is refreshed only if time_skew <= 0.
  256. credentials._source_credentials.expiry = (
  257. _helpers.utcnow()
  258. + _helpers.REFRESH_THRESHOLD
  259. + datetime.timedelta(seconds=time_skew)
  260. )
  261. credentials._source_credentials.token = "Token"
  262. with mock.patch(
  263. "google.oauth2.service_account.Credentials.refresh", autospec=True
  264. ) as source_cred_refresh:
  265. expire_time = (
  266. _helpers.utcnow().replace(microsecond=0)
  267. + datetime.timedelta(seconds=500)
  268. ).isoformat("T") + "Z"
  269. response_body = {"accessToken": "token", "expireTime": expire_time}
  270. request = self.make_request(
  271. data=json.dumps(response_body), status=http_client.OK
  272. )
  273. credentials.refresh(request)
  274. assert credentials.valid
  275. assert not credentials.expired
  276. # Source credentials is refreshed only if it is expired within
  277. # _helpers.REFRESH_THRESHOLD
  278. if time_skew > 0:
  279. source_cred_refresh.assert_not_called()
  280. else:
  281. source_cred_refresh.assert_called_once()
  282. def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
  283. credentials = self.make_credentials(lifetime=None)
  284. token = "token"
  285. expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat(
  286. "T"
  287. )
  288. response_body = {"accessToken": token, "expireTime": expire_time}
  289. request = self.make_request(
  290. data=json.dumps(response_body), status=http_client.OK
  291. )
  292. with pytest.raises(exceptions.RefreshError) as excinfo:
  293. credentials.refresh(request)
  294. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  295. assert not credentials.valid
  296. assert credentials.expired
  297. def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
  298. credentials = self.make_credentials(lifetime=None)
  299. response_body = {
  300. "error": {
  301. "code": 403,
  302. "message": "The caller does not have permission",
  303. "status": "PERMISSION_DENIED",
  304. }
  305. }
  306. request = self.make_request(
  307. data=json.dumps(response_body), status=http_client.UNAUTHORIZED
  308. )
  309. with pytest.raises(exceptions.RefreshError) as excinfo:
  310. credentials.refresh(request)
  311. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  312. assert not credentials.valid
  313. assert credentials.expired
  314. def test_refresh_failure(self):
  315. credentials = self.make_credentials(lifetime=None)
  316. credentials.expiry = None
  317. credentials.token = "token"
  318. id_creds = impersonated_credentials.IDTokenCredentials(
  319. credentials, target_audience="audience"
  320. )
  321. response = mock.create_autospec(transport.Response, instance=False)
  322. response.status_code = http_client.UNAUTHORIZED
  323. response.json = mock.Mock(return_value="failed to get ID token")
  324. with mock.patch(
  325. "google.auth.transport.requests.AuthorizedSession.post",
  326. return_value=response,
  327. ):
  328. with pytest.raises(exceptions.RefreshError) as excinfo:
  329. id_creds.refresh(None)
  330. assert excinfo.match("Error getting ID token")
  331. def test_refresh_failure_http_error(self, mock_donor_credentials):
  332. credentials = self.make_credentials(lifetime=None)
  333. response_body = {}
  334. request = self.make_request(
  335. data=json.dumps(response_body), status=http_client.HTTPException
  336. )
  337. with pytest.raises(exceptions.RefreshError) as excinfo:
  338. credentials.refresh(request)
  339. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  340. assert not credentials.valid
  341. assert credentials.expired
  342. def test_expired(self):
  343. credentials = self.make_credentials(lifetime=None)
  344. assert credentials.expired
  345. def test_signer(self):
  346. credentials = self.make_credentials()
  347. assert isinstance(credentials.signer, impersonated_credentials.Credentials)
  348. def test_signer_email(self):
  349. credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
  350. assert credentials.signer_email == self.TARGET_PRINCIPAL
  351. def test_service_account_email(self):
  352. credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
  353. assert credentials.service_account_email == self.TARGET_PRINCIPAL
  354. def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign):
  355. credentials = self.make_credentials(lifetime=None)
  356. expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob"
  357. self._sign_bytes_helper(
  358. credentials,
  359. mock_donor_credentials,
  360. mock_authorizedsession_sign,
  361. expected_url,
  362. )
  363. def test_sign_bytes_nonGdu(
  364. self, mock_donor_credentials, mock_authorizedsession_sign
  365. ):
  366. source_credentials = service_account.Credentials(
  367. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  368. )
  369. credentials = self.make_credentials(
  370. lifetime=None, source_credentials=source_credentials
  371. )
  372. expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob"
  373. self._sign_bytes_helper(
  374. credentials,
  375. mock_donor_credentials,
  376. mock_authorizedsession_sign,
  377. expected_url,
  378. )
  379. def _sign_bytes_helper(
  380. self,
  381. credentials,
  382. mock_donor_credentials,
  383. mock_authorizedsession_sign,
  384. expected_url,
  385. ):
  386. token = "token"
  387. expire_time = (
  388. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  389. ).isoformat("T") + "Z"
  390. token_response_body = {"accessToken": token, "expireTime": expire_time}
  391. response = mock.create_autospec(transport.Response, instance=False)
  392. response.status = http_client.OK
  393. response.data = _helpers.to_bytes(json.dumps(token_response_body))
  394. request = mock.create_autospec(transport.Request, instance=False)
  395. request.return_value = response
  396. credentials.refresh(request)
  397. assert credentials.valid
  398. assert not credentials.expired
  399. signature = credentials.sign_bytes(b"signed bytes")
  400. mock_authorizedsession_sign.assert_called_with(
  401. mock.ANY,
  402. "POST",
  403. expected_url,
  404. None,
  405. json={"payload": "c2lnbmVkIGJ5dGVz", "delegates": []},
  406. headers={"Content-Type": "application/json"},
  407. )
  408. assert signature == b"signature"
  409. def test_sign_bytes_failure(self):
  410. credentials = self.make_credentials(lifetime=None)
  411. with mock.patch(
  412. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  413. ) as auth_session:
  414. data = {"error": {"code": 403, "message": "unauthorized"}}
  415. mock_response = MockResponse(data, http_client.UNAUTHORIZED)
  416. auth_session.return_value = mock_response
  417. with pytest.raises(exceptions.TransportError) as excinfo:
  418. credentials.sign_bytes(b"foo")
  419. assert excinfo.match("'code': 403")
  420. @mock.patch("time.sleep", return_value=None)
  421. def test_sign_bytes_retryable_failure(self, mock_time):
  422. credentials = self.make_credentials(lifetime=None)
  423. with mock.patch(
  424. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  425. ) as auth_session:
  426. data = {"error": {"code": 500, "message": "internal_failure"}}
  427. mock_response = MockResponse(data, http_client.INTERNAL_SERVER_ERROR)
  428. auth_session.return_value = mock_response
  429. with pytest.raises(exceptions.TransportError) as excinfo:
  430. credentials.sign_bytes(b"foo")
  431. assert excinfo.match("exhausted signBlob endpoint retries")
  432. def test_with_quota_project(self):
  433. credentials = self.make_credentials()
  434. quota_project_creds = credentials.with_quota_project("project-foo")
  435. assert quota_project_creds._quota_project_id == "project-foo"
  436. @pytest.mark.parametrize("use_data_bytes", [True, False])
  437. def test_with_quota_project_iam_endpoint_override(
  438. self, use_data_bytes, mock_donor_credentials
  439. ):
  440. credentials = self.make_credentials(
  441. lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE
  442. )
  443. token = "token"
  444. # iam_endpoint_override should be copied to created credentials.
  445. quota_project_creds = credentials.with_quota_project("project-foo")
  446. expire_time = (
  447. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  448. ).isoformat("T") + "Z"
  449. response_body = {"accessToken": token, "expireTime": expire_time}
  450. request = self.make_request(
  451. data=json.dumps(response_body),
  452. status=http_client.OK,
  453. use_data_bytes=use_data_bytes,
  454. )
  455. quota_project_creds.refresh(request)
  456. assert quota_project_creds.valid
  457. assert not quota_project_creds.expired
  458. # Confirm override endpoint used.
  459. request_kwargs = request.call_args[1]
  460. assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
  461. def test_with_scopes(self):
  462. credentials = self.make_credentials()
  463. credentials._target_scopes = []
  464. assert credentials.requires_scopes is True
  465. credentials = credentials.with_scopes(["fake_scope1", "fake_scope2"])
  466. assert credentials.requires_scopes is False
  467. assert credentials._target_scopes == ["fake_scope1", "fake_scope2"]
  468. def test_with_scopes_provide_default_scopes(self):
  469. credentials = self.make_credentials()
  470. credentials._target_scopes = []
  471. credentials = credentials.with_scopes(
  472. ["fake_scope1"], default_scopes=["fake_scope2"]
  473. )
  474. assert credentials._target_scopes == ["fake_scope1"]
  475. def test_id_token_success(
  476. self, mock_donor_credentials, mock_authorizedsession_idtoken
  477. ):
  478. credentials = self.make_credentials(lifetime=None)
  479. token = "token"
  480. target_audience = "https://foo.bar"
  481. expire_time = (
  482. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  483. ).isoformat("T") + "Z"
  484. response_body = {"accessToken": token, "expireTime": expire_time}
  485. request = self.make_request(
  486. data=json.dumps(response_body), status=http_client.OK
  487. )
  488. credentials.refresh(request)
  489. assert credentials.valid
  490. assert not credentials.expired
  491. id_creds = impersonated_credentials.IDTokenCredentials(
  492. credentials, target_audience=target_audience
  493. )
  494. id_creds.refresh(request)
  495. assert id_creds.token == ID_TOKEN_DATA
  496. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(ID_TOKEN_EXPIRY)
  497. def test_id_token_metrics(self, mock_donor_credentials):
  498. credentials = self.make_credentials(lifetime=None)
  499. credentials.token = "token"
  500. credentials.expiry = None
  501. target_audience = "https://foo.bar"
  502. id_creds = impersonated_credentials.IDTokenCredentials(
  503. credentials, target_audience=target_audience
  504. )
  505. with mock.patch(
  506. "google.auth.metrics.token_request_id_token_impersonate",
  507. return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  508. ):
  509. with mock.patch(
  510. "google.auth.transport.requests.AuthorizedSession.post", autospec=True
  511. ) as mock_post:
  512. data = {"token": ID_TOKEN_DATA}
  513. mock_post.return_value = MockResponse(data, http_client.OK)
  514. id_creds.refresh(None)
  515. assert id_creds.token == ID_TOKEN_DATA
  516. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(
  517. ID_TOKEN_EXPIRY
  518. )
  519. assert (
  520. mock_post.call_args.kwargs["headers"]["x-goog-api-client"]
  521. == ID_TOKEN_REQUEST_METRICS_HEADER_VALUE
  522. )
  523. def test_id_token_from_credential(
  524. self, mock_donor_credentials, mock_authorizedsession_idtoken
  525. ):
  526. credentials = self.make_credentials(lifetime=None)
  527. target_credentials = self.make_credentials(lifetime=None)
  528. expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateIdToken"
  529. self._test_id_token_helper(
  530. credentials,
  531. target_credentials,
  532. mock_donor_credentials,
  533. mock_authorizedsession_idtoken,
  534. expected_url,
  535. )
  536. def test_id_token_from_credential_nonGdu(
  537. self, mock_donor_credentials, mock_authorizedsession_idtoken
  538. ):
  539. source_credentials = service_account.Credentials(
  540. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  541. )
  542. credentials = self.make_credentials(
  543. lifetime=None, source_credentials=source_credentials
  544. )
  545. target_credentials = self.make_credentials(
  546. lifetime=None, source_credentials=source_credentials
  547. )
  548. expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateIdToken"
  549. self._test_id_token_helper(
  550. credentials,
  551. target_credentials,
  552. mock_donor_credentials,
  553. mock_authorizedsession_idtoken,
  554. expected_url,
  555. )
  556. def _test_id_token_helper(
  557. self,
  558. credentials,
  559. target_credentials,
  560. mock_donor_credentials,
  561. mock_authorizedsession_idtoken,
  562. expected_url,
  563. ):
  564. token = "token"
  565. target_audience = "https://foo.bar"
  566. expire_time = (
  567. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  568. ).isoformat("T") + "Z"
  569. response_body = {"accessToken": token, "expireTime": expire_time}
  570. request = self.make_request(
  571. data=json.dumps(response_body), status=http_client.OK
  572. )
  573. credentials.refresh(request)
  574. assert credentials.valid
  575. assert not credentials.expired
  576. id_creds = impersonated_credentials.IDTokenCredentials(
  577. credentials, target_audience=target_audience, include_email=True
  578. )
  579. id_creds = id_creds.from_credentials(target_credentials=target_credentials)
  580. id_creds.refresh(request)
  581. args = mock_authorizedsession_idtoken.call_args.args
  582. assert args[2] == expected_url
  583. assert id_creds.token == ID_TOKEN_DATA
  584. assert id_creds._include_email is True
  585. assert id_creds._target_credentials is target_credentials
  586. def test_id_token_with_target_audience(
  587. self, mock_donor_credentials, mock_authorizedsession_idtoken
  588. ):
  589. credentials = self.make_credentials(lifetime=None)
  590. token = "token"
  591. target_audience = "https://foo.bar"
  592. expire_time = (
  593. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  594. ).isoformat("T") + "Z"
  595. response_body = {"accessToken": token, "expireTime": expire_time}
  596. request = self.make_request(
  597. data=json.dumps(response_body), status=http_client.OK
  598. )
  599. credentials.refresh(request)
  600. assert credentials.valid
  601. assert not credentials.expired
  602. id_creds = impersonated_credentials.IDTokenCredentials(
  603. credentials, include_email=True
  604. )
  605. id_creds = id_creds.with_target_audience(target_audience=target_audience)
  606. id_creds.refresh(request)
  607. assert id_creds.token == ID_TOKEN_DATA
  608. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(ID_TOKEN_EXPIRY)
  609. assert id_creds._include_email is True
  610. def test_id_token_invalid_cred(
  611. self, mock_donor_credentials, mock_authorizedsession_idtoken
  612. ):
  613. credentials = None
  614. with pytest.raises(exceptions.GoogleAuthError) as excinfo:
  615. impersonated_credentials.IDTokenCredentials(credentials)
  616. assert excinfo.match("Provided Credential must be" " impersonated_credentials")
  617. def test_id_token_with_include_email(
  618. self, mock_donor_credentials, mock_authorizedsession_idtoken
  619. ):
  620. credentials = self.make_credentials(lifetime=None)
  621. token = "token"
  622. target_audience = "https://foo.bar"
  623. expire_time = (
  624. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  625. ).isoformat("T") + "Z"
  626. response_body = {"accessToken": token, "expireTime": expire_time}
  627. request = self.make_request(
  628. data=json.dumps(response_body), status=http_client.OK
  629. )
  630. credentials.refresh(request)
  631. assert credentials.valid
  632. assert not credentials.expired
  633. id_creds = impersonated_credentials.IDTokenCredentials(
  634. credentials, target_audience=target_audience
  635. )
  636. id_creds = id_creds.with_include_email(True)
  637. id_creds.refresh(request)
  638. assert id_creds.token == ID_TOKEN_DATA
  639. def test_id_token_with_quota_project(
  640. self, mock_donor_credentials, mock_authorizedsession_idtoken
  641. ):
  642. credentials = self.make_credentials(lifetime=None)
  643. token = "token"
  644. target_audience = "https://foo.bar"
  645. expire_time = (
  646. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  647. ).isoformat("T") + "Z"
  648. response_body = {"accessToken": token, "expireTime": expire_time}
  649. request = self.make_request(
  650. data=json.dumps(response_body), status=http_client.OK
  651. )
  652. credentials.refresh(request)
  653. assert credentials.valid
  654. assert not credentials.expired
  655. id_creds = impersonated_credentials.IDTokenCredentials(
  656. credentials, target_audience=target_audience
  657. )
  658. id_creds = id_creds.with_quota_project("project-foo")
  659. id_creds.refresh(request)
  660. assert id_creds.quota_project_id == "project-foo"