test_impersonated_credentials.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933
  1. # Copyright 2018 Google Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import os
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import crypt
  22. from google.auth import exceptions
  23. from google.auth import impersonated_credentials
  24. from google.auth import transport
  25. from google.auth.impersonated_credentials import Credentials
  26. from google.oauth2 import credentials
  27. from google.oauth2 import service_account
  28. import yatest.common as yc
  29. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
  30. with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
  31. PRIVATE_KEY_BYTES = fh.read()
  32. SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
  33. ID_TOKEN_DATA = (
  34. "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew"
  35. "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc"
  36. "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle"
  37. "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L"
  38. "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN"
  39. "zA4NTY4In0.redacted"
  40. )
  41. ID_TOKEN_EXPIRY = 1564475051
  42. with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
  43. SERVICE_ACCOUNT_INFO = json.load(fh)
  44. SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
  45. TOKEN_URI = "https://example.com/oauth2/token"
  46. ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  47. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  48. )
  49. ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  50. "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/imp"
  51. )
  52. @pytest.fixture
  53. def mock_donor_credentials():
  54. with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant:
  55. grant.return_value = (
  56. "source token",
  57. _helpers.utcnow() + datetime.timedelta(seconds=500),
  58. {},
  59. )
  60. yield grant
  61. @pytest.fixture
  62. def mock_dwd_credentials():
  63. with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant:
  64. grant.return_value = (
  65. "1/fFAGRNJasdfz70BzhT3Zg",
  66. _helpers.utcnow() + datetime.timedelta(seconds=500),
  67. {},
  68. )
  69. yield grant
  70. class MockResponse:
  71. def __init__(self, json_data, status_code):
  72. self.json_data = json_data
  73. self.status_code = status_code
  74. def json(self):
  75. return self.json_data
  76. @pytest.fixture
  77. def mock_authorizedsession_sign():
  78. with mock.patch(
  79. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  80. ) as auth_session:
  81. data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"}
  82. auth_session.return_value = MockResponse(data, http_client.OK)
  83. yield auth_session
  84. @pytest.fixture
  85. def mock_authorizedsession_idtoken():
  86. with mock.patch(
  87. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  88. ) as auth_session:
  89. data = {"token": ID_TOKEN_DATA}
  90. auth_session.return_value = MockResponse(data, http_client.OK)
  91. yield auth_session
  92. class TestImpersonatedCredentials(object):
  93. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  94. TARGET_PRINCIPAL = "impersonated@project.iam.gserviceaccount.com"
  95. TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"]
  96. # DELEGATES: List[str] = []
  97. # Because Python 2.7:
  98. DELEGATES = [] # type: ignore
  99. LIFETIME = 3600
  100. SOURCE_CREDENTIALS = service_account.Credentials(
  101. SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
  102. )
  103. USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE")
  104. IAM_ENDPOINT_OVERRIDE = (
  105. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  106. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  107. )
  108. def make_credentials(
  109. self,
  110. source_credentials=SOURCE_CREDENTIALS,
  111. lifetime=LIFETIME,
  112. target_principal=TARGET_PRINCIPAL,
  113. subject=None,
  114. iam_endpoint_override=None,
  115. ):
  116. return Credentials(
  117. source_credentials=source_credentials,
  118. target_principal=target_principal,
  119. target_scopes=self.TARGET_SCOPES,
  120. delegates=self.DELEGATES,
  121. lifetime=lifetime,
  122. subject=subject,
  123. iam_endpoint_override=iam_endpoint_override,
  124. )
  125. def test_get_cred_info(self):
  126. credentials = self.make_credentials()
  127. assert not credentials.get_cred_info()
  128. credentials._cred_file_path = "/path/to/file"
  129. assert credentials.get_cred_info() == {
  130. "credential_source": "/path/to/file",
  131. "credential_type": "impersonated credentials",
  132. "principal": "impersonated@project.iam.gserviceaccount.com",
  133. }
  134. def test_universe_domain_matching_source(self):
  135. source_credentials = service_account.Credentials(
  136. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  137. )
  138. credentials = self.make_credentials(source_credentials=source_credentials)
  139. assert credentials.universe_domain == "foo.bar"
  140. def test__make_copy_get_cred_info(self):
  141. credentials = self.make_credentials()
  142. credentials._cred_file_path = "/path/to/file"
  143. cred_copy = credentials._make_copy()
  144. assert cred_copy._cred_file_path == "/path/to/file"
  145. def test_make_from_user_credentials(self):
  146. credentials = self.make_credentials(
  147. source_credentials=self.USER_SOURCE_CREDENTIALS
  148. )
  149. assert not credentials.valid
  150. assert credentials.expired
  151. def test_default_state(self):
  152. credentials = self.make_credentials()
  153. assert not credentials.valid
  154. assert credentials.expired
  155. def test_make_from_service_account_self_signed_jwt(self):
  156. source_credentials = service_account.Credentials(
  157. SIGNER, self.SERVICE_ACCOUNT_EMAIL, TOKEN_URI, always_use_jwt_access=True
  158. )
  159. credentials = self.make_credentials(source_credentials=source_credentials)
  160. # test the source credential don't lose self signed jwt setting
  161. assert credentials._source_credentials._always_use_jwt_access
  162. assert credentials._source_credentials._jwt_credentials
  163. def make_request(
  164. self,
  165. data,
  166. status=http_client.OK,
  167. headers=None,
  168. side_effect=None,
  169. use_data_bytes=True,
  170. ):
  171. response = mock.create_autospec(transport.Response, instance=False)
  172. response.status = status
  173. response.data = _helpers.to_bytes(data) if use_data_bytes else data
  174. response.headers = headers or {}
  175. request = mock.create_autospec(transport.Request, instance=False)
  176. request.side_effect = side_effect
  177. request.return_value = response
  178. return request
  179. def test_token_usage_metrics(self):
  180. credentials = self.make_credentials()
  181. credentials.token = "token"
  182. credentials.expiry = None
  183. headers = {}
  184. credentials.before_request(mock.Mock(), None, None, headers)
  185. assert headers["authorization"] == "Bearer token"
  186. assert headers["x-goog-api-client"] == "cred-type/imp"
  187. @pytest.mark.parametrize("use_data_bytes", [True, False])
  188. def test_refresh_success(self, use_data_bytes, mock_donor_credentials):
  189. credentials = self.make_credentials(lifetime=None)
  190. token = "token"
  191. expire_time = (
  192. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  193. ).isoformat("T") + "Z"
  194. response_body = {"accessToken": token, "expireTime": expire_time}
  195. request = self.make_request(
  196. data=json.dumps(response_body),
  197. status=http_client.OK,
  198. use_data_bytes=use_data_bytes,
  199. )
  200. with mock.patch(
  201. "google.auth.metrics.token_request_access_token_impersonate",
  202. return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  203. ):
  204. credentials.refresh(request)
  205. assert credentials.valid
  206. assert not credentials.expired
  207. assert (
  208. request.call_args.kwargs["headers"]["x-goog-api-client"]
  209. == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE
  210. )
  211. @pytest.mark.parametrize("use_data_bytes", [True, False])
  212. def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials):
  213. credentials = self.make_credentials(subject="test@email.com", lifetime=None)
  214. response_body = {"signedJwt": "example_signed_jwt"}
  215. request = self.make_request(
  216. data=json.dumps(response_body),
  217. status=http_client.OK,
  218. use_data_bytes=use_data_bytes,
  219. )
  220. with mock.patch(
  221. "google.auth.metrics.token_request_access_token_impersonate",
  222. return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  223. ):
  224. credentials.refresh(request)
  225. assert credentials.valid
  226. assert not credentials.expired
  227. assert credentials.token == "1/fFAGRNJasdfz70BzhT3Zg"
  228. @pytest.mark.parametrize("use_data_bytes", [True, False])
  229. def test_refresh_success_nonGdu(self, use_data_bytes, mock_donor_credentials):
  230. source_credentials = service_account.Credentials(
  231. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  232. )
  233. credentials = self.make_credentials(
  234. lifetime=None, source_credentials=source_credentials
  235. )
  236. token = "token"
  237. expire_time = (
  238. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  239. ).isoformat("T") + "Z"
  240. response_body = {"accessToken": token, "expireTime": expire_time}
  241. request = self.make_request(
  242. data=json.dumps(response_body),
  243. status=http_client.OK,
  244. use_data_bytes=use_data_bytes,
  245. )
  246. credentials.refresh(request)
  247. assert credentials.valid
  248. assert not credentials.expired
  249. # Confirm override endpoint used.
  250. request_kwargs = request.call_args[1]
  251. assert (
  252. request_kwargs["url"]
  253. == "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateAccessToken"
  254. )
  255. @pytest.mark.parametrize("use_data_bytes", [True, False])
  256. def test_refresh_success_iam_endpoint_override(
  257. self, use_data_bytes, mock_donor_credentials
  258. ):
  259. credentials = self.make_credentials(
  260. lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE
  261. )
  262. token = "token"
  263. expire_time = (
  264. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  265. ).isoformat("T") + "Z"
  266. response_body = {"accessToken": token, "expireTime": expire_time}
  267. request = self.make_request(
  268. data=json.dumps(response_body),
  269. status=http_client.OK,
  270. use_data_bytes=use_data_bytes,
  271. )
  272. credentials.refresh(request)
  273. assert credentials.valid
  274. assert not credentials.expired
  275. # Confirm override endpoint used.
  276. request_kwargs = request.call_args[1]
  277. assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
  278. @pytest.mark.parametrize("time_skew", [150, -150])
  279. def test_refresh_source_credentials(self, time_skew):
  280. credentials = self.make_credentials(lifetime=None)
  281. # Source credentials is refreshed only if it is expired within
  282. # _helpers.REFRESH_THRESHOLD from now. We add a time_skew to the expiry, so
  283. # source credentials is refreshed only if time_skew <= 0.
  284. credentials._source_credentials.expiry = (
  285. _helpers.utcnow()
  286. + _helpers.REFRESH_THRESHOLD
  287. + datetime.timedelta(seconds=time_skew)
  288. )
  289. credentials._source_credentials.token = "Token"
  290. with mock.patch(
  291. "google.oauth2.service_account.Credentials.refresh", autospec=True
  292. ) as source_cred_refresh:
  293. expire_time = (
  294. _helpers.utcnow().replace(microsecond=0)
  295. + datetime.timedelta(seconds=500)
  296. ).isoformat("T") + "Z"
  297. response_body = {"accessToken": "token", "expireTime": expire_time}
  298. request = self.make_request(
  299. data=json.dumps(response_body), status=http_client.OK
  300. )
  301. credentials.refresh(request)
  302. assert credentials.valid
  303. assert not credentials.expired
  304. # Source credentials is refreshed only if it is expired within
  305. # _helpers.REFRESH_THRESHOLD
  306. if time_skew > 0:
  307. source_cred_refresh.assert_not_called()
  308. else:
  309. source_cred_refresh.assert_called_once()
  310. def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
  311. credentials = self.make_credentials(lifetime=None)
  312. token = "token"
  313. expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat(
  314. "T"
  315. )
  316. response_body = {"accessToken": token, "expireTime": expire_time}
  317. request = self.make_request(
  318. data=json.dumps(response_body), status=http_client.OK
  319. )
  320. with pytest.raises(exceptions.RefreshError) as excinfo:
  321. credentials.refresh(request)
  322. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  323. assert not credentials.valid
  324. assert credentials.expired
  325. def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
  326. credentials = self.make_credentials(lifetime=None)
  327. response_body = {
  328. "error": {
  329. "code": 403,
  330. "message": "The caller does not have permission",
  331. "status": "PERMISSION_DENIED",
  332. }
  333. }
  334. request = self.make_request(
  335. data=json.dumps(response_body), status=http_client.UNAUTHORIZED
  336. )
  337. with pytest.raises(exceptions.RefreshError) as excinfo:
  338. credentials.refresh(request)
  339. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  340. assert not credentials.valid
  341. assert credentials.expired
  342. def test_refresh_failure(self):
  343. credentials = self.make_credentials(lifetime=None)
  344. credentials.expiry = None
  345. credentials.token = "token"
  346. id_creds = impersonated_credentials.IDTokenCredentials(
  347. credentials, target_audience="audience"
  348. )
  349. response = mock.create_autospec(transport.Response, instance=False)
  350. response.status_code = http_client.UNAUTHORIZED
  351. response.json = mock.Mock(return_value="failed to get ID token")
  352. with mock.patch(
  353. "google.auth.transport.requests.AuthorizedSession.post",
  354. return_value=response,
  355. ):
  356. with pytest.raises(exceptions.RefreshError) as excinfo:
  357. id_creds.refresh(None)
  358. assert excinfo.match("Error getting ID token")
  359. def test_refresh_failure_http_error(self, mock_donor_credentials):
  360. credentials = self.make_credentials(lifetime=None)
  361. response_body = {}
  362. request = self.make_request(
  363. data=json.dumps(response_body), status=http_client.HTTPException
  364. )
  365. with pytest.raises(exceptions.RefreshError) as excinfo:
  366. credentials.refresh(request)
  367. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  368. assert not credentials.valid
  369. assert credentials.expired
  370. def test_refresh_failure_subject_with_nondefault_domain(
  371. self, mock_donor_credentials
  372. ):
  373. source_credentials = service_account.Credentials(
  374. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  375. )
  376. credentials = self.make_credentials(
  377. source_credentials=source_credentials, subject="test@email.com"
  378. )
  379. expire_time = (_helpers.utcnow().replace(microsecond=0)).isoformat("T") + "Z"
  380. response_body = {"accessToken": "token", "expireTime": expire_time}
  381. request = self.make_request(
  382. data=json.dumps(response_body), status=http_client.OK
  383. )
  384. with pytest.raises(exceptions.GoogleAuthError) as excinfo:
  385. credentials.refresh(request)
  386. assert excinfo.match(
  387. "Domain-wide delegation is not supported in universes other "
  388. + "than googleapis.com"
  389. )
  390. assert not credentials.valid
  391. assert credentials.expired
  392. def test_expired(self):
  393. credentials = self.make_credentials(lifetime=None)
  394. assert credentials.expired
  395. def test_signer(self):
  396. credentials = self.make_credentials()
  397. assert isinstance(credentials.signer, impersonated_credentials.Credentials)
  398. def test_signer_email(self):
  399. credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
  400. assert credentials.signer_email == self.TARGET_PRINCIPAL
  401. def test_service_account_email(self):
  402. credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
  403. assert credentials.service_account_email == self.TARGET_PRINCIPAL
  404. def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign):
  405. credentials = self.make_credentials(lifetime=None)
  406. expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob"
  407. self._sign_bytes_helper(
  408. credentials,
  409. mock_donor_credentials,
  410. mock_authorizedsession_sign,
  411. expected_url,
  412. )
  413. def test_sign_bytes_nonGdu(
  414. self, mock_donor_credentials, mock_authorizedsession_sign
  415. ):
  416. source_credentials = service_account.Credentials(
  417. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  418. )
  419. credentials = self.make_credentials(
  420. lifetime=None, source_credentials=source_credentials
  421. )
  422. expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob"
  423. self._sign_bytes_helper(
  424. credentials,
  425. mock_donor_credentials,
  426. mock_authorizedsession_sign,
  427. expected_url,
  428. )
  429. def _sign_bytes_helper(
  430. self,
  431. credentials,
  432. mock_donor_credentials,
  433. mock_authorizedsession_sign,
  434. expected_url,
  435. ):
  436. token = "token"
  437. expire_time = (
  438. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  439. ).isoformat("T") + "Z"
  440. token_response_body = {"accessToken": token, "expireTime": expire_time}
  441. response = mock.create_autospec(transport.Response, instance=False)
  442. response.status = http_client.OK
  443. response.data = _helpers.to_bytes(json.dumps(token_response_body))
  444. request = mock.create_autospec(transport.Request, instance=False)
  445. request.return_value = response
  446. credentials.refresh(request)
  447. assert credentials.valid
  448. assert not credentials.expired
  449. signature = credentials.sign_bytes(b"signed bytes")
  450. mock_authorizedsession_sign.assert_called_with(
  451. mock.ANY,
  452. "POST",
  453. expected_url,
  454. None,
  455. json={"payload": "c2lnbmVkIGJ5dGVz", "delegates": []},
  456. headers={"Content-Type": "application/json"},
  457. )
  458. assert signature == b"signature"
  459. def test_sign_bytes_failure(self):
  460. credentials = self.make_credentials(lifetime=None)
  461. with mock.patch(
  462. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  463. ) as auth_session:
  464. data = {"error": {"code": 403, "message": "unauthorized"}}
  465. mock_response = MockResponse(data, http_client.UNAUTHORIZED)
  466. auth_session.return_value = mock_response
  467. with pytest.raises(exceptions.TransportError) as excinfo:
  468. credentials.sign_bytes(b"foo")
  469. assert excinfo.match("'code': 403")
  470. @mock.patch("time.sleep", return_value=None)
  471. def test_sign_bytes_retryable_failure(self, mock_time):
  472. credentials = self.make_credentials(lifetime=None)
  473. with mock.patch(
  474. "google.auth.transport.requests.AuthorizedSession.request", autospec=True
  475. ) as auth_session:
  476. data = {"error": {"code": 500, "message": "internal_failure"}}
  477. mock_response = MockResponse(data, http_client.INTERNAL_SERVER_ERROR)
  478. auth_session.return_value = mock_response
  479. with pytest.raises(exceptions.TransportError) as excinfo:
  480. credentials.sign_bytes(b"foo")
  481. assert excinfo.match("exhausted signBlob endpoint retries")
  482. def test_with_quota_project(self):
  483. credentials = self.make_credentials()
  484. quota_project_creds = credentials.with_quota_project("project-foo")
  485. assert quota_project_creds._quota_project_id == "project-foo"
  486. @pytest.mark.parametrize("use_data_bytes", [True, False])
  487. def test_with_quota_project_iam_endpoint_override(
  488. self, use_data_bytes, mock_donor_credentials
  489. ):
  490. credentials = self.make_credentials(
  491. lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE
  492. )
  493. token = "token"
  494. # iam_endpoint_override should be copied to created credentials.
  495. quota_project_creds = credentials.with_quota_project("project-foo")
  496. expire_time = (
  497. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  498. ).isoformat("T") + "Z"
  499. response_body = {"accessToken": token, "expireTime": expire_time}
  500. request = self.make_request(
  501. data=json.dumps(response_body),
  502. status=http_client.OK,
  503. use_data_bytes=use_data_bytes,
  504. )
  505. quota_project_creds.refresh(request)
  506. assert quota_project_creds.valid
  507. assert not quota_project_creds.expired
  508. # Confirm override endpoint used.
  509. request_kwargs = request.call_args[1]
  510. assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
  511. def test_with_scopes(self):
  512. credentials = self.make_credentials()
  513. credentials._target_scopes = []
  514. assert credentials.requires_scopes is True
  515. credentials = credentials.with_scopes(["fake_scope1", "fake_scope2"])
  516. assert credentials.requires_scopes is False
  517. assert credentials._target_scopes == ["fake_scope1", "fake_scope2"]
  518. def test_with_scopes_provide_default_scopes(self):
  519. credentials = self.make_credentials()
  520. credentials._target_scopes = []
  521. credentials = credentials.with_scopes(
  522. ["fake_scope1"], default_scopes=["fake_scope2"]
  523. )
  524. assert credentials._target_scopes == ["fake_scope1"]
  525. def test_id_token_success(
  526. self, mock_donor_credentials, mock_authorizedsession_idtoken
  527. ):
  528. credentials = self.make_credentials(lifetime=None)
  529. token = "token"
  530. target_audience = "https://foo.bar"
  531. expire_time = (
  532. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  533. ).isoformat("T") + "Z"
  534. response_body = {"accessToken": token, "expireTime": expire_time}
  535. request = self.make_request(
  536. data=json.dumps(response_body), status=http_client.OK
  537. )
  538. credentials.refresh(request)
  539. assert credentials.valid
  540. assert not credentials.expired
  541. id_creds = impersonated_credentials.IDTokenCredentials(
  542. credentials, target_audience=target_audience
  543. )
  544. id_creds.refresh(request)
  545. assert id_creds.token == ID_TOKEN_DATA
  546. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(ID_TOKEN_EXPIRY)
  547. def test_id_token_metrics(self, mock_donor_credentials):
  548. credentials = self.make_credentials(lifetime=None)
  549. credentials.token = "token"
  550. credentials.expiry = None
  551. target_audience = "https://foo.bar"
  552. id_creds = impersonated_credentials.IDTokenCredentials(
  553. credentials, target_audience=target_audience
  554. )
  555. with mock.patch(
  556. "google.auth.metrics.token_request_id_token_impersonate",
  557. return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  558. ):
  559. with mock.patch(
  560. "google.auth.transport.requests.AuthorizedSession.post", autospec=True
  561. ) as mock_post:
  562. data = {"token": ID_TOKEN_DATA}
  563. mock_post.return_value = MockResponse(data, http_client.OK)
  564. id_creds.refresh(None)
  565. assert id_creds.token == ID_TOKEN_DATA
  566. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(
  567. ID_TOKEN_EXPIRY
  568. )
  569. assert (
  570. mock_post.call_args.kwargs["headers"]["x-goog-api-client"]
  571. == ID_TOKEN_REQUEST_METRICS_HEADER_VALUE
  572. )
  573. def test_id_token_from_credential(
  574. self, mock_donor_credentials, mock_authorizedsession_idtoken
  575. ):
  576. credentials = self.make_credentials(lifetime=None)
  577. target_credentials = self.make_credentials(lifetime=None)
  578. expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateIdToken"
  579. self._test_id_token_helper(
  580. credentials,
  581. target_credentials,
  582. mock_donor_credentials,
  583. mock_authorizedsession_idtoken,
  584. expected_url,
  585. )
  586. def test_id_token_from_credential_nonGdu(
  587. self, mock_donor_credentials, mock_authorizedsession_idtoken
  588. ):
  589. source_credentials = service_account.Credentials(
  590. SIGNER, "some@email.com", TOKEN_URI, universe_domain="foo.bar"
  591. )
  592. credentials = self.make_credentials(
  593. lifetime=None, source_credentials=source_credentials
  594. )
  595. target_credentials = self.make_credentials(
  596. lifetime=None, source_credentials=source_credentials
  597. )
  598. expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateIdToken"
  599. self._test_id_token_helper(
  600. credentials,
  601. target_credentials,
  602. mock_donor_credentials,
  603. mock_authorizedsession_idtoken,
  604. expected_url,
  605. )
  606. def _test_id_token_helper(
  607. self,
  608. credentials,
  609. target_credentials,
  610. mock_donor_credentials,
  611. mock_authorizedsession_idtoken,
  612. expected_url,
  613. ):
  614. token = "token"
  615. target_audience = "https://foo.bar"
  616. expire_time = (
  617. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  618. ).isoformat("T") + "Z"
  619. response_body = {"accessToken": token, "expireTime": expire_time}
  620. request = self.make_request(
  621. data=json.dumps(response_body), status=http_client.OK
  622. )
  623. credentials.refresh(request)
  624. assert credentials.valid
  625. assert not credentials.expired
  626. id_creds = impersonated_credentials.IDTokenCredentials(
  627. credentials, target_audience=target_audience, include_email=True
  628. )
  629. id_creds = id_creds.from_credentials(target_credentials=target_credentials)
  630. id_creds.refresh(request)
  631. args = mock_authorizedsession_idtoken.call_args.args
  632. assert args[2] == expected_url
  633. assert id_creds.token == ID_TOKEN_DATA
  634. assert id_creds._include_email is True
  635. assert id_creds._target_credentials is target_credentials
  636. def test_id_token_with_target_audience(
  637. self, mock_donor_credentials, mock_authorizedsession_idtoken
  638. ):
  639. credentials = self.make_credentials(lifetime=None)
  640. token = "token"
  641. target_audience = "https://foo.bar"
  642. expire_time = (
  643. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  644. ).isoformat("T") + "Z"
  645. response_body = {"accessToken": token, "expireTime": expire_time}
  646. request = self.make_request(
  647. data=json.dumps(response_body), status=http_client.OK
  648. )
  649. credentials.refresh(request)
  650. assert credentials.valid
  651. assert not credentials.expired
  652. id_creds = impersonated_credentials.IDTokenCredentials(
  653. credentials, include_email=True
  654. )
  655. id_creds = id_creds.with_target_audience(target_audience=target_audience)
  656. id_creds.refresh(request)
  657. assert id_creds.token == ID_TOKEN_DATA
  658. assert id_creds.expiry == datetime.datetime.utcfromtimestamp(ID_TOKEN_EXPIRY)
  659. assert id_creds._include_email is True
  660. def test_id_token_invalid_cred(
  661. self, mock_donor_credentials, mock_authorizedsession_idtoken
  662. ):
  663. credentials = None
  664. with pytest.raises(exceptions.GoogleAuthError) as excinfo:
  665. impersonated_credentials.IDTokenCredentials(credentials)
  666. assert excinfo.match("Provided Credential must be" " impersonated_credentials")
  667. def test_id_token_with_include_email(
  668. self, mock_donor_credentials, mock_authorizedsession_idtoken
  669. ):
  670. credentials = self.make_credentials(lifetime=None)
  671. token = "token"
  672. target_audience = "https://foo.bar"
  673. expire_time = (
  674. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  675. ).isoformat("T") + "Z"
  676. response_body = {"accessToken": token, "expireTime": expire_time}
  677. request = self.make_request(
  678. data=json.dumps(response_body), status=http_client.OK
  679. )
  680. credentials.refresh(request)
  681. assert credentials.valid
  682. assert not credentials.expired
  683. id_creds = impersonated_credentials.IDTokenCredentials(
  684. credentials, target_audience=target_audience
  685. )
  686. id_creds = id_creds.with_include_email(True)
  687. id_creds.refresh(request)
  688. assert id_creds.token == ID_TOKEN_DATA
  689. def test_id_token_with_quota_project(
  690. self, mock_donor_credentials, mock_authorizedsession_idtoken
  691. ):
  692. credentials = self.make_credentials(lifetime=None)
  693. token = "token"
  694. target_audience = "https://foo.bar"
  695. expire_time = (
  696. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
  697. ).isoformat("T") + "Z"
  698. response_body = {"accessToken": token, "expireTime": expire_time}
  699. request = self.make_request(
  700. data=json.dumps(response_body), status=http_client.OK
  701. )
  702. credentials.refresh(request)
  703. assert credentials.valid
  704. assert not credentials.expired
  705. id_creds = impersonated_credentials.IDTokenCredentials(
  706. credentials, target_audience=target_audience
  707. )
  708. id_creds = id_creds.with_quota_project("project-foo")
  709. id_creds.refresh(request)
  710. assert id_creds.quota_project_id == "project-foo"
  711. def test_sign_jwt_request_success(self):
  712. principal = "foo@example.com"
  713. expected_signed_jwt = "correct_signed_jwt"
  714. response_body = {"keyId": "1", "signedJwt": expected_signed_jwt}
  715. request = self.make_request(
  716. data=json.dumps(response_body), status=http_client.OK
  717. )
  718. signed_jwt = impersonated_credentials._sign_jwt_request(
  719. request=request, principal=principal, headers={}, payload={}
  720. )
  721. assert signed_jwt == expected_signed_jwt
  722. request.assert_called_once_with(
  723. url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/foo@example.com:signJwt",
  724. method="POST",
  725. headers={},
  726. body=json.dumps({"delegates": [], "payload": json.dumps({})}).encode(
  727. "utf-8"
  728. ),
  729. )
  730. def test_sign_jwt_request_http_error(self):
  731. principal = "foo@example.com"
  732. request = self.make_request(
  733. data="error_message", status=http_client.BAD_REQUEST
  734. )
  735. with pytest.raises(exceptions.RefreshError) as excinfo:
  736. _ = impersonated_credentials._sign_jwt_request(
  737. request=request, principal=principal, headers={}, payload={}
  738. )
  739. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  740. assert excinfo.value.args[0] == "Unable to acquire impersonated credentials"
  741. assert excinfo.value.args[1] == "error_message"
  742. def test_sign_jwt_request_invalid_response_error(self):
  743. principal = "foo@example.com"
  744. request = self.make_request(data="invalid_data", status=http_client.OK)
  745. with pytest.raises(exceptions.RefreshError) as excinfo:
  746. _ = impersonated_credentials._sign_jwt_request(
  747. request=request, principal=principal, headers={}, payload={}
  748. )
  749. assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
  750. assert (
  751. excinfo.value.args[0]
  752. == "Unable to acquire impersonated credentials: No signed JWT in response."
  753. )
  754. assert excinfo.value.args[1] == "invalid_data"