test_identity_pool.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import json
  16. import os
  17. import mock
  18. import pytest
  19. from six.moves import http_client
  20. from six.moves import urllib
  21. from google.auth import _helpers
  22. from google.auth import exceptions
  23. from google.auth import identity_pool
  24. from google.auth import transport
  25. CLIENT_ID = "username"
  26. CLIENT_SECRET = "password"
  27. # Base64 encoding of "username:password".
  28. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  29. SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
  30. SERVICE_ACCOUNT_IMPERSONATION_URL = (
  31. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  32. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  33. )
  34. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  35. SCOPES = ["scope1", "scope2"]
  36. import yatest.common
  37. DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
  38. SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
  39. SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
  40. SUBJECT_TOKEN_FIELD_NAME = "access_token"
  41. with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
  42. TEXT_FILE_SUBJECT_TOKEN = fh.read()
  43. with open(SUBJECT_TOKEN_JSON_FILE) as fh:
  44. JSON_FILE_CONTENT = json.load(fh)
  45. JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
  46. TOKEN_URL = "https://sts.googleapis.com/v1/token"
  47. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  48. AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
  49. class TestCredentials(object):
  50. CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
  51. CREDENTIAL_SOURCE_JSON = {
  52. "file": SUBJECT_TOKEN_JSON_FILE,
  53. "format": {"type": "json", "subject_token_field_name": "access_token"},
  54. }
  55. CREDENTIAL_URL = "http://fakeurl.com"
  56. CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
  57. CREDENTIAL_SOURCE_JSON_URL = {
  58. "url": CREDENTIAL_URL,
  59. "format": {"type": "json", "subject_token_field_name": "access_token"},
  60. }
  61. SUCCESS_RESPONSE = {
  62. "access_token": "ACCESS_TOKEN",
  63. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  64. "token_type": "Bearer",
  65. "expires_in": 3600,
  66. "scope": " ".join(SCOPES),
  67. }
  68. @classmethod
  69. def make_mock_response(cls, status, data):
  70. response = mock.create_autospec(transport.Response, instance=True)
  71. response.status = status
  72. if isinstance(data, dict):
  73. response.data = json.dumps(data).encode("utf-8")
  74. else:
  75. response.data = data
  76. return response
  77. @classmethod
  78. def make_mock_request(
  79. cls, token_status=http_client.OK, token_data=None, *extra_requests
  80. ):
  81. responses = []
  82. responses.append(cls.make_mock_response(token_status, token_data))
  83. while len(extra_requests) > 0:
  84. # If service account impersonation is requested, mock the expected response.
  85. status, data, extra_requests = (
  86. extra_requests[0],
  87. extra_requests[1],
  88. extra_requests[2:],
  89. )
  90. responses.append(cls.make_mock_response(status, data))
  91. request = mock.create_autospec(transport.Request)
  92. request.side_effect = responses
  93. return request
  94. @classmethod
  95. def assert_credential_request_kwargs(
  96. cls, request_kwargs, headers, url=CREDENTIAL_URL
  97. ):
  98. assert request_kwargs["url"] == url
  99. assert request_kwargs["method"] == "GET"
  100. assert request_kwargs["headers"] == headers
  101. assert request_kwargs.get("body", None) is None
  102. @classmethod
  103. def assert_token_request_kwargs(
  104. cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
  105. ):
  106. assert request_kwargs["url"] == token_url
  107. assert request_kwargs["method"] == "POST"
  108. assert request_kwargs["headers"] == headers
  109. assert request_kwargs["body"] is not None
  110. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  111. assert len(body_tuples) == len(request_data.keys())
  112. for (k, v) in body_tuples:
  113. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  114. @classmethod
  115. def assert_impersonation_request_kwargs(
  116. cls,
  117. request_kwargs,
  118. headers,
  119. request_data,
  120. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  121. ):
  122. assert request_kwargs["url"] == service_account_impersonation_url
  123. assert request_kwargs["method"] == "POST"
  124. assert request_kwargs["headers"] == headers
  125. assert request_kwargs["body"] is not None
  126. body_json = json.loads(request_kwargs["body"].decode("utf-8"))
  127. assert body_json == request_data
  128. @classmethod
  129. def assert_underlying_credentials_refresh(
  130. cls,
  131. credentials,
  132. audience,
  133. subject_token,
  134. subject_token_type,
  135. token_url,
  136. service_account_impersonation_url=None,
  137. basic_auth_encoding=None,
  138. quota_project_id=None,
  139. used_scopes=None,
  140. credential_data=None,
  141. scopes=None,
  142. default_scopes=None,
  143. ):
  144. """Utility to assert that a credentials are initialized with the expected
  145. attributes by calling refresh functionality and confirming response matches
  146. expected one and that the underlying requests were populated with the
  147. expected parameters.
  148. """
  149. # STS token exchange request/response.
  150. token_response = cls.SUCCESS_RESPONSE.copy()
  151. token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
  152. if basic_auth_encoding:
  153. token_headers["Authorization"] = "Basic " + basic_auth_encoding
  154. if service_account_impersonation_url:
  155. token_scopes = "https://www.googleapis.com/auth/iam"
  156. else:
  157. token_scopes = " ".join(used_scopes or [])
  158. token_request_data = {
  159. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  160. "audience": audience,
  161. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  162. "scope": token_scopes,
  163. "subject_token": subject_token,
  164. "subject_token_type": subject_token_type,
  165. }
  166. if service_account_impersonation_url:
  167. # Service account impersonation request/response.
  168. expire_time = (
  169. _helpers.utcnow().replace(microsecond=0)
  170. + datetime.timedelta(seconds=3600)
  171. ).isoformat("T") + "Z"
  172. impersonation_response = {
  173. "accessToken": "SA_ACCESS_TOKEN",
  174. "expireTime": expire_time,
  175. }
  176. impersonation_headers = {
  177. "Content-Type": "application/json",
  178. "authorization": "Bearer {}".format(token_response["access_token"]),
  179. }
  180. impersonation_request_data = {
  181. "delegates": None,
  182. "scope": used_scopes,
  183. "lifetime": "3600s",
  184. }
  185. # Initialize mock request to handle token retrieval, token exchange and
  186. # service account impersonation request.
  187. requests = []
  188. if credential_data:
  189. requests.append((http_client.OK, credential_data))
  190. token_request_index = len(requests)
  191. requests.append((http_client.OK, token_response))
  192. if service_account_impersonation_url:
  193. impersonation_request_index = len(requests)
  194. requests.append((http_client.OK, impersonation_response))
  195. request = cls.make_mock_request(*[el for req in requests for el in req])
  196. credentials.refresh(request)
  197. assert len(request.call_args_list) == len(requests)
  198. if credential_data:
  199. cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  200. # Verify token exchange request parameters.
  201. cls.assert_token_request_kwargs(
  202. request.call_args_list[token_request_index][1],
  203. token_headers,
  204. token_request_data,
  205. token_url,
  206. )
  207. # Verify service account impersonation request parameters if the request
  208. # is processed.
  209. if service_account_impersonation_url:
  210. cls.assert_impersonation_request_kwargs(
  211. request.call_args_list[impersonation_request_index][1],
  212. impersonation_headers,
  213. impersonation_request_data,
  214. service_account_impersonation_url,
  215. )
  216. assert credentials.token == impersonation_response["accessToken"]
  217. else:
  218. assert credentials.token == token_response["access_token"]
  219. assert credentials.quota_project_id == quota_project_id
  220. assert credentials.scopes == scopes
  221. assert credentials.default_scopes == default_scopes
  222. @classmethod
  223. def make_credentials(
  224. cls,
  225. client_id=None,
  226. client_secret=None,
  227. quota_project_id=None,
  228. scopes=None,
  229. default_scopes=None,
  230. service_account_impersonation_url=None,
  231. credential_source=None,
  232. ):
  233. return identity_pool.Credentials(
  234. audience=AUDIENCE,
  235. subject_token_type=SUBJECT_TOKEN_TYPE,
  236. token_url=TOKEN_URL,
  237. service_account_impersonation_url=service_account_impersonation_url,
  238. credential_source=credential_source,
  239. client_id=client_id,
  240. client_secret=client_secret,
  241. quota_project_id=quota_project_id,
  242. scopes=scopes,
  243. default_scopes=default_scopes,
  244. )
  245. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  246. def test_from_info_full_options(self, mock_init):
  247. credentials = identity_pool.Credentials.from_info(
  248. {
  249. "audience": AUDIENCE,
  250. "subject_token_type": SUBJECT_TOKEN_TYPE,
  251. "token_url": TOKEN_URL,
  252. "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
  253. "client_id": CLIENT_ID,
  254. "client_secret": CLIENT_SECRET,
  255. "quota_project_id": QUOTA_PROJECT_ID,
  256. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  257. }
  258. )
  259. # Confirm identity_pool.Credentials instantiated with expected attributes.
  260. assert isinstance(credentials, identity_pool.Credentials)
  261. mock_init.assert_called_once_with(
  262. audience=AUDIENCE,
  263. subject_token_type=SUBJECT_TOKEN_TYPE,
  264. token_url=TOKEN_URL,
  265. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  266. client_id=CLIENT_ID,
  267. client_secret=CLIENT_SECRET,
  268. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  269. quota_project_id=QUOTA_PROJECT_ID,
  270. )
  271. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  272. def test_from_info_required_options_only(self, mock_init):
  273. credentials = identity_pool.Credentials.from_info(
  274. {
  275. "audience": AUDIENCE,
  276. "subject_token_type": SUBJECT_TOKEN_TYPE,
  277. "token_url": TOKEN_URL,
  278. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  279. }
  280. )
  281. # Confirm identity_pool.Credentials instantiated with expected attributes.
  282. assert isinstance(credentials, identity_pool.Credentials)
  283. mock_init.assert_called_once_with(
  284. audience=AUDIENCE,
  285. subject_token_type=SUBJECT_TOKEN_TYPE,
  286. token_url=TOKEN_URL,
  287. service_account_impersonation_url=None,
  288. client_id=None,
  289. client_secret=None,
  290. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  291. quota_project_id=None,
  292. )
  293. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  294. def test_from_file_full_options(self, mock_init, tmpdir):
  295. info = {
  296. "audience": AUDIENCE,
  297. "subject_token_type": SUBJECT_TOKEN_TYPE,
  298. "token_url": TOKEN_URL,
  299. "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
  300. "client_id": CLIENT_ID,
  301. "client_secret": CLIENT_SECRET,
  302. "quota_project_id": QUOTA_PROJECT_ID,
  303. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  304. }
  305. config_file = tmpdir.join("config.json")
  306. config_file.write(json.dumps(info))
  307. credentials = identity_pool.Credentials.from_file(str(config_file))
  308. # Confirm identity_pool.Credentials instantiated with expected attributes.
  309. assert isinstance(credentials, identity_pool.Credentials)
  310. mock_init.assert_called_once_with(
  311. audience=AUDIENCE,
  312. subject_token_type=SUBJECT_TOKEN_TYPE,
  313. token_url=TOKEN_URL,
  314. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  315. client_id=CLIENT_ID,
  316. client_secret=CLIENT_SECRET,
  317. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  318. quota_project_id=QUOTA_PROJECT_ID,
  319. )
  320. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  321. def test_from_file_required_options_only(self, mock_init, tmpdir):
  322. info = {
  323. "audience": AUDIENCE,
  324. "subject_token_type": SUBJECT_TOKEN_TYPE,
  325. "token_url": TOKEN_URL,
  326. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  327. }
  328. config_file = tmpdir.join("config.json")
  329. config_file.write(json.dumps(info))
  330. credentials = identity_pool.Credentials.from_file(str(config_file))
  331. # Confirm identity_pool.Credentials instantiated with expected attributes.
  332. assert isinstance(credentials, identity_pool.Credentials)
  333. mock_init.assert_called_once_with(
  334. audience=AUDIENCE,
  335. subject_token_type=SUBJECT_TOKEN_TYPE,
  336. token_url=TOKEN_URL,
  337. service_account_impersonation_url=None,
  338. client_id=None,
  339. client_secret=None,
  340. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  341. quota_project_id=None,
  342. )
  343. def test_constructor_invalid_options(self):
  344. credential_source = {"unsupported": "value"}
  345. with pytest.raises(ValueError) as excinfo:
  346. self.make_credentials(credential_source=credential_source)
  347. assert excinfo.match(r"Missing credential_source")
  348. def test_constructor_invalid_options_url_and_file(self):
  349. credential_source = {
  350. "url": self.CREDENTIAL_URL,
  351. "file": SUBJECT_TOKEN_TEXT_FILE,
  352. }
  353. with pytest.raises(ValueError) as excinfo:
  354. self.make_credentials(credential_source=credential_source)
  355. assert excinfo.match(r"Ambiguous credential_source")
  356. def test_constructor_invalid_options_environment_id(self):
  357. credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
  358. with pytest.raises(ValueError) as excinfo:
  359. self.make_credentials(credential_source=credential_source)
  360. assert excinfo.match(
  361. r"Invalid Identity Pool credential_source field 'environment_id'"
  362. )
  363. def test_constructor_invalid_credential_source(self):
  364. with pytest.raises(ValueError) as excinfo:
  365. self.make_credentials(credential_source="non-dict")
  366. assert excinfo.match(r"Missing credential_source")
  367. def test_constructor_invalid_credential_source_format_type(self):
  368. credential_source = {"format": {"type": "xml"}}
  369. with pytest.raises(ValueError) as excinfo:
  370. self.make_credentials(credential_source=credential_source)
  371. assert excinfo.match(r"Invalid credential_source format 'xml'")
  372. def test_constructor_missing_subject_token_field_name(self):
  373. credential_source = {"format": {"type": "json"}}
  374. with pytest.raises(ValueError) as excinfo:
  375. self.make_credentials(credential_source=credential_source)
  376. assert excinfo.match(
  377. r"Missing subject_token_field_name for JSON credential_source format"
  378. )
  379. def test_info_with_file_credential_source(self):
  380. credentials = self.make_credentials(
  381. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
  382. )
  383. assert credentials.info == {
  384. "type": "external_account",
  385. "audience": AUDIENCE,
  386. "subject_token_type": SUBJECT_TOKEN_TYPE,
  387. "token_url": TOKEN_URL,
  388. "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
  389. }
  390. def test_info_with_url_credential_source(self):
  391. credentials = self.make_credentials(
  392. credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
  393. )
  394. assert credentials.info == {
  395. "type": "external_account",
  396. "audience": AUDIENCE,
  397. "subject_token_type": SUBJECT_TOKEN_TYPE,
  398. "token_url": TOKEN_URL,
  399. "credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
  400. }
  401. def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
  402. # Provide empty text file.
  403. empty_file = tmpdir.join("empty.txt")
  404. empty_file.write("")
  405. credential_source = {"file": str(empty_file)}
  406. credentials = self.make_credentials(credential_source=credential_source)
  407. with pytest.raises(exceptions.RefreshError) as excinfo:
  408. credentials.retrieve_subject_token(None)
  409. assert excinfo.match(r"Missing subject_token in the credential_source file")
  410. def test_retrieve_subject_token_text_file(self):
  411. credentials = self.make_credentials(
  412. credential_source=self.CREDENTIAL_SOURCE_TEXT
  413. )
  414. subject_token = credentials.retrieve_subject_token(None)
  415. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  416. def test_retrieve_subject_token_json_file(self):
  417. credentials = self.make_credentials(
  418. credential_source=self.CREDENTIAL_SOURCE_JSON
  419. )
  420. subject_token = credentials.retrieve_subject_token(None)
  421. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  422. def test_retrieve_subject_token_json_file_invalid_field_name(self):
  423. credential_source = {
  424. "file": SUBJECT_TOKEN_JSON_FILE,
  425. "format": {"type": "json", "subject_token_field_name": "not_found"},
  426. }
  427. credentials = self.make_credentials(credential_source=credential_source)
  428. with pytest.raises(exceptions.RefreshError) as excinfo:
  429. credentials.retrieve_subject_token(None)
  430. assert excinfo.match(
  431. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  432. SUBJECT_TOKEN_JSON_FILE, "not_found"
  433. )
  434. )
  435. def test_retrieve_subject_token_invalid_json(self, tmpdir):
  436. # Provide JSON file. This should result in JSON parsing error.
  437. invalid_json_file = tmpdir.join("invalid.json")
  438. invalid_json_file.write("{")
  439. credential_source = {
  440. "file": str(invalid_json_file),
  441. "format": {"type": "json", "subject_token_field_name": "access_token"},
  442. }
  443. credentials = self.make_credentials(credential_source=credential_source)
  444. with pytest.raises(exceptions.RefreshError) as excinfo:
  445. credentials.retrieve_subject_token(None)
  446. assert excinfo.match(
  447. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  448. str(invalid_json_file), "access_token"
  449. )
  450. )
  451. def test_retrieve_subject_token_file_not_found(self):
  452. credential_source = {"file": "./not_found.txt"}
  453. credentials = self.make_credentials(credential_source=credential_source)
  454. with pytest.raises(exceptions.RefreshError) as excinfo:
  455. credentials.retrieve_subject_token(None)
  456. assert excinfo.match(r"File './not_found.txt' was not found")
  457. def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
  458. self,
  459. ):
  460. credentials = self.make_credentials(
  461. client_id=CLIENT_ID,
  462. client_secret=CLIENT_SECRET,
  463. # Test with text format type.
  464. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  465. scopes=SCOPES,
  466. # Default scopes should be ignored.
  467. default_scopes=["ignored"],
  468. )
  469. self.assert_underlying_credentials_refresh(
  470. credentials=credentials,
  471. audience=AUDIENCE,
  472. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  473. subject_token_type=SUBJECT_TOKEN_TYPE,
  474. token_url=TOKEN_URL,
  475. service_account_impersonation_url=None,
  476. basic_auth_encoding=BASIC_AUTH_ENCODING,
  477. quota_project_id=None,
  478. used_scopes=SCOPES,
  479. scopes=SCOPES,
  480. default_scopes=["ignored"],
  481. )
  482. def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
  483. credentials = self.make_credentials(
  484. client_id=CLIENT_ID,
  485. client_secret=CLIENT_SECRET,
  486. # Test with text format type.
  487. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  488. scopes=None,
  489. # Default scopes should be used since user specified scopes are none.
  490. default_scopes=SCOPES,
  491. )
  492. self.assert_underlying_credentials_refresh(
  493. credentials=credentials,
  494. audience=AUDIENCE,
  495. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  496. subject_token_type=SUBJECT_TOKEN_TYPE,
  497. token_url=TOKEN_URL,
  498. service_account_impersonation_url=None,
  499. basic_auth_encoding=BASIC_AUTH_ENCODING,
  500. quota_project_id=None,
  501. used_scopes=SCOPES,
  502. scopes=None,
  503. default_scopes=SCOPES,
  504. )
  505. def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
  506. # Initialize credentials with service account impersonation and basic auth.
  507. credentials = self.make_credentials(
  508. # Test with text format type.
  509. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  510. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  511. scopes=SCOPES,
  512. # Default scopes should be ignored.
  513. default_scopes=["ignored"],
  514. )
  515. self.assert_underlying_credentials_refresh(
  516. credentials=credentials,
  517. audience=AUDIENCE,
  518. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  519. subject_token_type=SUBJECT_TOKEN_TYPE,
  520. token_url=TOKEN_URL,
  521. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  522. basic_auth_encoding=None,
  523. quota_project_id=None,
  524. used_scopes=SCOPES,
  525. scopes=SCOPES,
  526. default_scopes=["ignored"],
  527. )
  528. def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
  529. # Initialize credentials with service account impersonation, basic auth
  530. # and default scopes (no user scopes).
  531. credentials = self.make_credentials(
  532. # Test with text format type.
  533. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  534. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  535. scopes=None,
  536. # Default scopes should be used since user specified scopes are none.
  537. default_scopes=SCOPES,
  538. )
  539. self.assert_underlying_credentials_refresh(
  540. credentials=credentials,
  541. audience=AUDIENCE,
  542. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  543. subject_token_type=SUBJECT_TOKEN_TYPE,
  544. token_url=TOKEN_URL,
  545. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  546. basic_auth_encoding=None,
  547. quota_project_id=None,
  548. used_scopes=SCOPES,
  549. scopes=None,
  550. default_scopes=SCOPES,
  551. )
  552. def test_refresh_json_file_success_without_impersonation(self):
  553. credentials = self.make_credentials(
  554. client_id=CLIENT_ID,
  555. client_secret=CLIENT_SECRET,
  556. # Test with JSON format type.
  557. credential_source=self.CREDENTIAL_SOURCE_JSON,
  558. scopes=SCOPES,
  559. )
  560. self.assert_underlying_credentials_refresh(
  561. credentials=credentials,
  562. audience=AUDIENCE,
  563. subject_token=JSON_FILE_SUBJECT_TOKEN,
  564. subject_token_type=SUBJECT_TOKEN_TYPE,
  565. token_url=TOKEN_URL,
  566. service_account_impersonation_url=None,
  567. basic_auth_encoding=BASIC_AUTH_ENCODING,
  568. quota_project_id=None,
  569. used_scopes=SCOPES,
  570. scopes=SCOPES,
  571. default_scopes=None,
  572. )
  573. def test_refresh_json_file_success_with_impersonation(self):
  574. # Initialize credentials with service account impersonation and basic auth.
  575. credentials = self.make_credentials(
  576. # Test with JSON format type.
  577. credential_source=self.CREDENTIAL_SOURCE_JSON,
  578. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  579. scopes=SCOPES,
  580. )
  581. self.assert_underlying_credentials_refresh(
  582. credentials=credentials,
  583. audience=AUDIENCE,
  584. subject_token=JSON_FILE_SUBJECT_TOKEN,
  585. subject_token_type=SUBJECT_TOKEN_TYPE,
  586. token_url=TOKEN_URL,
  587. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  588. basic_auth_encoding=None,
  589. quota_project_id=None,
  590. used_scopes=SCOPES,
  591. scopes=SCOPES,
  592. default_scopes=None,
  593. )
  594. def test_refresh_with_retrieve_subject_token_error(self):
  595. credential_source = {
  596. "file": SUBJECT_TOKEN_JSON_FILE,
  597. "format": {"type": "json", "subject_token_field_name": "not_found"},
  598. }
  599. credentials = self.make_credentials(credential_source=credential_source)
  600. with pytest.raises(exceptions.RefreshError) as excinfo:
  601. credentials.refresh(None)
  602. assert excinfo.match(
  603. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  604. SUBJECT_TOKEN_JSON_FILE, "not_found"
  605. )
  606. )
  607. def test_retrieve_subject_token_from_url(self):
  608. credentials = self.make_credentials(
  609. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
  610. )
  611. request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
  612. subject_token = credentials.retrieve_subject_token(request)
  613. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  614. self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  615. def test_retrieve_subject_token_from_url_with_headers(self):
  616. credentials = self.make_credentials(
  617. credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
  618. )
  619. request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
  620. subject_token = credentials.retrieve_subject_token(request)
  621. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  622. self.assert_credential_request_kwargs(
  623. request.call_args_list[0][1], {"foo": "bar"}
  624. )
  625. def test_retrieve_subject_token_from_url_json(self):
  626. credentials = self.make_credentials(
  627. credential_source=self.CREDENTIAL_SOURCE_JSON_URL
  628. )
  629. request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
  630. subject_token = credentials.retrieve_subject_token(request)
  631. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  632. self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  633. def test_retrieve_subject_token_from_url_json_with_headers(self):
  634. credentials = self.make_credentials(
  635. credential_source={
  636. "url": self.CREDENTIAL_URL,
  637. "format": {"type": "json", "subject_token_field_name": "access_token"},
  638. "headers": {"foo": "bar"},
  639. }
  640. )
  641. request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
  642. subject_token = credentials.retrieve_subject_token(request)
  643. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  644. self.assert_credential_request_kwargs(
  645. request.call_args_list[0][1], {"foo": "bar"}
  646. )
  647. def test_retrieve_subject_token_from_url_not_found(self):
  648. credentials = self.make_credentials(
  649. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
  650. )
  651. with pytest.raises(exceptions.RefreshError) as excinfo:
  652. credentials.retrieve_subject_token(
  653. self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
  654. )
  655. assert excinfo.match("Unable to retrieve Identity Pool subject token")
  656. def test_retrieve_subject_token_from_url_json_invalid_field(self):
  657. credential_source = {
  658. "url": self.CREDENTIAL_URL,
  659. "format": {"type": "json", "subject_token_field_name": "not_found"},
  660. }
  661. credentials = self.make_credentials(credential_source=credential_source)
  662. with pytest.raises(exceptions.RefreshError) as excinfo:
  663. credentials.retrieve_subject_token(
  664. self.make_mock_request(token_data=JSON_FILE_CONTENT)
  665. )
  666. assert excinfo.match(
  667. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  668. self.CREDENTIAL_URL, "not_found"
  669. )
  670. )
  671. def test_retrieve_subject_token_from_url_json_invalid_format(self):
  672. credentials = self.make_credentials(
  673. credential_source=self.CREDENTIAL_SOURCE_JSON_URL
  674. )
  675. with pytest.raises(exceptions.RefreshError) as excinfo:
  676. credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
  677. assert excinfo.match(
  678. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  679. self.CREDENTIAL_URL, "access_token"
  680. )
  681. )
  682. def test_refresh_text_file_success_without_impersonation_url(self):
  683. credentials = self.make_credentials(
  684. client_id=CLIENT_ID,
  685. client_secret=CLIENT_SECRET,
  686. # Test with text format type.
  687. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
  688. scopes=SCOPES,
  689. )
  690. self.assert_underlying_credentials_refresh(
  691. credentials=credentials,
  692. audience=AUDIENCE,
  693. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  694. subject_token_type=SUBJECT_TOKEN_TYPE,
  695. token_url=TOKEN_URL,
  696. service_account_impersonation_url=None,
  697. basic_auth_encoding=BASIC_AUTH_ENCODING,
  698. quota_project_id=None,
  699. used_scopes=SCOPES,
  700. scopes=SCOPES,
  701. default_scopes=None,
  702. credential_data=TEXT_FILE_SUBJECT_TOKEN,
  703. )
  704. def test_refresh_text_file_success_with_impersonation_url(self):
  705. # Initialize credentials with service account impersonation and basic auth.
  706. credentials = self.make_credentials(
  707. # Test with text format type.
  708. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
  709. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  710. scopes=SCOPES,
  711. )
  712. self.assert_underlying_credentials_refresh(
  713. credentials=credentials,
  714. audience=AUDIENCE,
  715. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  716. subject_token_type=SUBJECT_TOKEN_TYPE,
  717. token_url=TOKEN_URL,
  718. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  719. basic_auth_encoding=None,
  720. quota_project_id=None,
  721. used_scopes=SCOPES,
  722. scopes=SCOPES,
  723. default_scopes=None,
  724. credential_data=TEXT_FILE_SUBJECT_TOKEN,
  725. )
  726. def test_refresh_json_file_success_without_impersonation_url(self):
  727. credentials = self.make_credentials(
  728. client_id=CLIENT_ID,
  729. client_secret=CLIENT_SECRET,
  730. # Test with JSON format type.
  731. credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
  732. scopes=SCOPES,
  733. )
  734. self.assert_underlying_credentials_refresh(
  735. credentials=credentials,
  736. audience=AUDIENCE,
  737. subject_token=JSON_FILE_SUBJECT_TOKEN,
  738. subject_token_type=SUBJECT_TOKEN_TYPE,
  739. token_url=TOKEN_URL,
  740. service_account_impersonation_url=None,
  741. basic_auth_encoding=BASIC_AUTH_ENCODING,
  742. quota_project_id=None,
  743. used_scopes=SCOPES,
  744. scopes=SCOPES,
  745. default_scopes=None,
  746. credential_data=JSON_FILE_CONTENT,
  747. )
  748. def test_refresh_json_file_success_with_impersonation_url(self):
  749. # Initialize credentials with service account impersonation and basic auth.
  750. credentials = self.make_credentials(
  751. # Test with JSON format type.
  752. credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
  753. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  754. scopes=SCOPES,
  755. )
  756. self.assert_underlying_credentials_refresh(
  757. credentials=credentials,
  758. audience=AUDIENCE,
  759. subject_token=JSON_FILE_SUBJECT_TOKEN,
  760. subject_token_type=SUBJECT_TOKEN_TYPE,
  761. token_url=TOKEN_URL,
  762. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  763. basic_auth_encoding=None,
  764. quota_project_id=None,
  765. used_scopes=SCOPES,
  766. scopes=SCOPES,
  767. default_scopes=None,
  768. credential_data=JSON_FILE_CONTENT,
  769. )
  770. def test_refresh_with_retrieve_subject_token_error_url(self):
  771. credential_source = {
  772. "url": self.CREDENTIAL_URL,
  773. "format": {"type": "json", "subject_token_field_name": "not_found"},
  774. }
  775. credentials = self.make_credentials(credential_source=credential_source)
  776. with pytest.raises(exceptions.RefreshError) as excinfo:
  777. credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
  778. assert excinfo.match(
  779. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  780. self.CREDENTIAL_URL, "not_found"
  781. )
  782. )