test_identity_pool.py 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import os
  18. import urllib
  19. import mock
  20. import pytest # type: ignore
  21. from google.auth import _helpers
  22. from google.auth import exceptions
  23. from google.auth import identity_pool
  24. from google.auth import metrics
  25. from google.auth import transport
  26. CLIENT_ID = "username"
  27. CLIENT_SECRET = "password"
  28. # Base64 encoding of "username:password".
  29. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  30. SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
  31. SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = (
  32. "https://us-east1-iamcredentials.googleapis.com"
  33. )
  34. SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
  35. SERVICE_ACCOUNT_EMAIL
  36. )
  37. SERVICE_ACCOUNT_IMPERSONATION_URL = (
  38. SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
  39. )
  40. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  41. SCOPES = ["scope1", "scope2"]
  42. import yatest.common as yc
  43. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
  44. SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
  45. SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
  46. SUBJECT_TOKEN_FIELD_NAME = "access_token"
  47. with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
  48. TEXT_FILE_SUBJECT_TOKEN = fh.read()
  49. with open(SUBJECT_TOKEN_JSON_FILE) as fh:
  50. JSON_FILE_CONTENT = json.load(fh)
  51. JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
  52. TOKEN_URL = "https://sts.googleapis.com/v1/token"
  53. TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
  54. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  55. AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
  56. WORKFORCE_AUDIENCE = (
  57. "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
  58. )
  59. WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
  60. WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
  61. DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
  62. VALID_TOKEN_URLS = [
  63. "https://sts.googleapis.com",
  64. "https://us-east-1.sts.googleapis.com",
  65. "https://US-EAST-1.sts.googleapis.com",
  66. "https://sts.us-east-1.googleapis.com",
  67. "https://sts.US-WEST-1.googleapis.com",
  68. "https://us-east-1-sts.googleapis.com",
  69. "https://US-WEST-1-sts.googleapis.com",
  70. "https://us-west-1-sts.googleapis.com/path?query",
  71. "https://sts-us-east-1.p.googleapis.com",
  72. ]
  73. INVALID_TOKEN_URLS = [
  74. "https://iamcredentials.googleapis.com",
  75. "sts.googleapis.com",
  76. "https://",
  77. "http://sts.googleapis.com",
  78. "https://st.s.googleapis.com",
  79. "https://us-eas\t-1.sts.googleapis.com",
  80. "https:/us-east-1.sts.googleapis.com",
  81. "https://US-WE/ST-1-sts.googleapis.com",
  82. "https://sts-us-east-1.googleapis.com",
  83. "https://sts-US-WEST-1.googleapis.com",
  84. "testhttps://us-east-1.sts.googleapis.com",
  85. "https://us-east-1.sts.googleapis.comevil.com",
  86. "https://us-east-1.us-east-1.sts.googleapis.com",
  87. "https://us-ea.s.t.sts.googleapis.com",
  88. "https://sts.googleapis.comevil.com",
  89. "hhttps://us-east-1.sts.googleapis.com",
  90. "https://us- -1.sts.googleapis.com",
  91. "https://-sts.googleapis.com",
  92. "https://us-east-1.sts.googleapis.com.evil.com",
  93. "https://sts.pgoogleapis.com",
  94. "https://p.googleapis.com",
  95. "https://sts.p.com",
  96. "http://sts.p.googleapis.com",
  97. "https://xyz-sts.p.googleapis.com",
  98. "https://sts-xyz.123.p.googleapis.com",
  99. "https://sts-xyz.p1.googleapis.com",
  100. "https://sts-xyz.p.foo.com",
  101. "https://sts-xyz.p.foo.googleapis.com",
  102. ]
  103. VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
  104. "https://iamcredentials.googleapis.com",
  105. "https://us-east-1.iamcredentials.googleapis.com",
  106. "https://US-EAST-1.iamcredentials.googleapis.com",
  107. "https://iamcredentials.us-east-1.googleapis.com",
  108. "https://iamcredentials.US-WEST-1.googleapis.com",
  109. "https://us-east-1-iamcredentials.googleapis.com",
  110. "https://US-WEST-1-iamcredentials.googleapis.com",
  111. "https://us-west-1-iamcredentials.googleapis.com/path?query",
  112. "https://iamcredentials-us-east-1.p.googleapis.com",
  113. ]
  114. INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
  115. "https://sts.googleapis.com",
  116. "iamcredentials.googleapis.com",
  117. "https://",
  118. "http://iamcredentials.googleapis.com",
  119. "https://iamcre.dentials.googleapis.com",
  120. "https://us-eas\t-1.iamcredentials.googleapis.com",
  121. "https:/us-east-1.iamcredentials.googleapis.com",
  122. "https://US-WE/ST-1-iamcredentials.googleapis.com",
  123. "https://iamcredentials-us-east-1.googleapis.com",
  124. "https://iamcredentials-US-WEST-1.googleapis.com",
  125. "testhttps://us-east-1.iamcredentials.googleapis.com",
  126. "https://us-east-1.iamcredentials.googleapis.comevil.com",
  127. "https://us-east-1.us-east-1.iamcredentials.googleapis.com",
  128. "https://us-ea.s.t.iamcredentials.googleapis.com",
  129. "https://iamcredentials.googleapis.comevil.com",
  130. "hhttps://us-east-1.iamcredentials.googleapis.com",
  131. "https://us- -1.iamcredentials.googleapis.com",
  132. "https://-iamcredentials.googleapis.com",
  133. "https://us-east-1.iamcredentials.googleapis.com.evil.com",
  134. "https://iamcredentials.pgoogleapis.com",
  135. "https://p.googleapis.com",
  136. "https://iamcredentials.p.com",
  137. "http://iamcredentials.p.googleapis.com",
  138. "https://xyz-iamcredentials.p.googleapis.com",
  139. "https://iamcredentials-xyz.123.p.googleapis.com",
  140. "https://iamcredentials-xyz.p1.googleapis.com",
  141. "https://iamcredentials-xyz.p.foo.com",
  142. "https://iamcredentials-xyz.p.foo.googleapis.com",
  143. ]
  144. class TestCredentials(object):
  145. CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
  146. CREDENTIAL_SOURCE_JSON = {
  147. "file": SUBJECT_TOKEN_JSON_FILE,
  148. "format": {"type": "json", "subject_token_field_name": "access_token"},
  149. }
  150. CREDENTIAL_URL = "http://fakeurl.com"
  151. CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
  152. CREDENTIAL_SOURCE_JSON_URL = {
  153. "url": CREDENTIAL_URL,
  154. "format": {"type": "json", "subject_token_field_name": "access_token"},
  155. }
  156. SUCCESS_RESPONSE = {
  157. "access_token": "ACCESS_TOKEN",
  158. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  159. "token_type": "Bearer",
  160. "expires_in": 3600,
  161. "scope": " ".join(SCOPES),
  162. }
  163. @classmethod
  164. def make_mock_response(cls, status, data):
  165. response = mock.create_autospec(transport.Response, instance=True)
  166. response.status = status
  167. if isinstance(data, dict):
  168. response.data = json.dumps(data).encode("utf-8")
  169. else:
  170. response.data = data
  171. return response
  172. @classmethod
  173. def make_mock_request(
  174. cls, token_status=http_client.OK, token_data=None, *extra_requests
  175. ):
  176. responses = []
  177. responses.append(cls.make_mock_response(token_status, token_data))
  178. while len(extra_requests) > 0:
  179. # If service account impersonation is requested, mock the expected response.
  180. status, data, extra_requests = (
  181. extra_requests[0],
  182. extra_requests[1],
  183. extra_requests[2:],
  184. )
  185. responses.append(cls.make_mock_response(status, data))
  186. request = mock.create_autospec(transport.Request)
  187. request.side_effect = responses
  188. return request
  189. @classmethod
  190. def assert_credential_request_kwargs(
  191. cls, request_kwargs, headers, url=CREDENTIAL_URL
  192. ):
  193. assert request_kwargs["url"] == url
  194. assert request_kwargs["method"] == "GET"
  195. assert request_kwargs["headers"] == headers
  196. assert request_kwargs.get("body", None) is None
  197. @classmethod
  198. def assert_token_request_kwargs(
  199. cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
  200. ):
  201. assert request_kwargs["url"] == token_url
  202. assert request_kwargs["method"] == "POST"
  203. assert request_kwargs["headers"] == headers
  204. assert request_kwargs["body"] is not None
  205. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  206. assert len(body_tuples) == len(request_data.keys())
  207. for (k, v) in body_tuples:
  208. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  209. @classmethod
  210. def assert_impersonation_request_kwargs(
  211. cls,
  212. request_kwargs,
  213. headers,
  214. request_data,
  215. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  216. ):
  217. assert request_kwargs["url"] == service_account_impersonation_url
  218. assert request_kwargs["method"] == "POST"
  219. assert request_kwargs["headers"] == headers
  220. assert request_kwargs["body"] is not None
  221. body_json = json.loads(request_kwargs["body"].decode("utf-8"))
  222. assert body_json == request_data
  223. @classmethod
  224. def assert_underlying_credentials_refresh(
  225. cls,
  226. credentials,
  227. audience,
  228. subject_token,
  229. subject_token_type,
  230. token_url,
  231. service_account_impersonation_url=None,
  232. basic_auth_encoding=None,
  233. quota_project_id=None,
  234. used_scopes=None,
  235. credential_data=None,
  236. scopes=None,
  237. default_scopes=None,
  238. workforce_pool_user_project=None,
  239. ):
  240. """Utility to assert that a credentials are initialized with the expected
  241. attributes by calling refresh functionality and confirming response matches
  242. expected one and that the underlying requests were populated with the
  243. expected parameters.
  244. """
  245. # STS token exchange request/response.
  246. token_response = cls.SUCCESS_RESPONSE.copy()
  247. token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
  248. if basic_auth_encoding:
  249. token_headers["Authorization"] = "Basic " + basic_auth_encoding
  250. metrics_options = {}
  251. if credentials._service_account_impersonation_url:
  252. metrics_options["sa-impersonation"] = "true"
  253. else:
  254. metrics_options["sa-impersonation"] = "false"
  255. metrics_options["config-lifetime"] = "false"
  256. if credentials._credential_source_file:
  257. metrics_options["source"] = "file"
  258. else:
  259. metrics_options["source"] = "url"
  260. token_headers["x-goog-api-client"] = metrics.byoid_metrics_header(
  261. metrics_options
  262. )
  263. if service_account_impersonation_url:
  264. token_scopes = "https://www.googleapis.com/auth/iam"
  265. else:
  266. token_scopes = " ".join(used_scopes or [])
  267. token_request_data = {
  268. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  269. "audience": audience,
  270. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  271. "scope": token_scopes,
  272. "subject_token": subject_token,
  273. "subject_token_type": subject_token_type,
  274. }
  275. if workforce_pool_user_project:
  276. token_request_data["options"] = urllib.parse.quote(
  277. json.dumps({"userProject": workforce_pool_user_project})
  278. )
  279. metrics_header_value = (
  280. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  281. )
  282. if service_account_impersonation_url:
  283. # Service account impersonation request/response.
  284. expire_time = (
  285. _helpers.utcnow().replace(microsecond=0)
  286. + datetime.timedelta(seconds=3600)
  287. ).isoformat("T") + "Z"
  288. impersonation_response = {
  289. "accessToken": "SA_ACCESS_TOKEN",
  290. "expireTime": expire_time,
  291. }
  292. impersonation_headers = {
  293. "Content-Type": "application/json",
  294. "authorization": "Bearer {}".format(token_response["access_token"]),
  295. "x-goog-api-client": metrics_header_value,
  296. "x-allowed-locations": "0x0",
  297. }
  298. impersonation_request_data = {
  299. "delegates": None,
  300. "scope": used_scopes,
  301. "lifetime": "3600s",
  302. }
  303. # Initialize mock request to handle token retrieval, token exchange and
  304. # service account impersonation request.
  305. requests = []
  306. if credential_data:
  307. requests.append((http_client.OK, credential_data))
  308. token_request_index = len(requests)
  309. requests.append((http_client.OK, token_response))
  310. if service_account_impersonation_url:
  311. impersonation_request_index = len(requests)
  312. requests.append((http_client.OK, impersonation_response))
  313. request = cls.make_mock_request(*[el for req in requests for el in req])
  314. with mock.patch(
  315. "google.auth.metrics.token_request_access_token_impersonate",
  316. return_value=metrics_header_value,
  317. ):
  318. credentials.refresh(request)
  319. assert len(request.call_args_list) == len(requests)
  320. if credential_data:
  321. cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  322. # Verify token exchange request parameters.
  323. cls.assert_token_request_kwargs(
  324. request.call_args_list[token_request_index][1],
  325. token_headers,
  326. token_request_data,
  327. token_url,
  328. )
  329. # Verify service account impersonation request parameters if the request
  330. # is processed.
  331. if service_account_impersonation_url:
  332. cls.assert_impersonation_request_kwargs(
  333. request.call_args_list[impersonation_request_index][1],
  334. impersonation_headers,
  335. impersonation_request_data,
  336. service_account_impersonation_url,
  337. )
  338. assert credentials.token == impersonation_response["accessToken"]
  339. else:
  340. assert credentials.token == token_response["access_token"]
  341. assert credentials.quota_project_id == quota_project_id
  342. assert credentials.scopes == scopes
  343. assert credentials.default_scopes == default_scopes
  344. @classmethod
  345. def make_credentials(
  346. cls,
  347. audience=AUDIENCE,
  348. subject_token_type=SUBJECT_TOKEN_TYPE,
  349. token_url=TOKEN_URL,
  350. token_info_url=TOKEN_INFO_URL,
  351. client_id=None,
  352. client_secret=None,
  353. quota_project_id=None,
  354. scopes=None,
  355. default_scopes=None,
  356. service_account_impersonation_url=None,
  357. credential_source=None,
  358. workforce_pool_user_project=None,
  359. ):
  360. return identity_pool.Credentials(
  361. audience=audience,
  362. subject_token_type=subject_token_type,
  363. token_url=token_url,
  364. token_info_url=token_info_url,
  365. service_account_impersonation_url=service_account_impersonation_url,
  366. credential_source=credential_source,
  367. client_id=client_id,
  368. client_secret=client_secret,
  369. quota_project_id=quota_project_id,
  370. scopes=scopes,
  371. default_scopes=default_scopes,
  372. workforce_pool_user_project=workforce_pool_user_project,
  373. )
  374. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  375. def test_from_info_full_options(self, mock_init):
  376. credentials = identity_pool.Credentials.from_info(
  377. {
  378. "audience": AUDIENCE,
  379. "subject_token_type": SUBJECT_TOKEN_TYPE,
  380. "token_url": TOKEN_URL,
  381. "token_info_url": TOKEN_INFO_URL,
  382. "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
  383. "service_account_impersonation": {"token_lifetime_seconds": 2800},
  384. "client_id": CLIENT_ID,
  385. "client_secret": CLIENT_SECRET,
  386. "quota_project_id": QUOTA_PROJECT_ID,
  387. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  388. }
  389. )
  390. # Confirm identity_pool.Credentials instantiated with expected attributes.
  391. assert isinstance(credentials, identity_pool.Credentials)
  392. mock_init.assert_called_once_with(
  393. audience=AUDIENCE,
  394. subject_token_type=SUBJECT_TOKEN_TYPE,
  395. token_url=TOKEN_URL,
  396. token_info_url=TOKEN_INFO_URL,
  397. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  398. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  399. client_id=CLIENT_ID,
  400. client_secret=CLIENT_SECRET,
  401. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  402. quota_project_id=QUOTA_PROJECT_ID,
  403. workforce_pool_user_project=None,
  404. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  405. )
  406. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  407. def test_from_info_required_options_only(self, mock_init):
  408. credentials = identity_pool.Credentials.from_info(
  409. {
  410. "audience": AUDIENCE,
  411. "subject_token_type": SUBJECT_TOKEN_TYPE,
  412. "token_url": TOKEN_URL,
  413. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  414. }
  415. )
  416. # Confirm identity_pool.Credentials instantiated with expected attributes.
  417. assert isinstance(credentials, identity_pool.Credentials)
  418. mock_init.assert_called_once_with(
  419. audience=AUDIENCE,
  420. subject_token_type=SUBJECT_TOKEN_TYPE,
  421. token_url=TOKEN_URL,
  422. token_info_url=None,
  423. service_account_impersonation_url=None,
  424. service_account_impersonation_options={},
  425. client_id=None,
  426. client_secret=None,
  427. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  428. quota_project_id=None,
  429. workforce_pool_user_project=None,
  430. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  431. )
  432. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  433. def test_from_info_workforce_pool(self, mock_init):
  434. credentials = identity_pool.Credentials.from_info(
  435. {
  436. "audience": WORKFORCE_AUDIENCE,
  437. "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
  438. "token_url": TOKEN_URL,
  439. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  440. "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
  441. }
  442. )
  443. # Confirm identity_pool.Credentials instantiated with expected attributes.
  444. assert isinstance(credentials, identity_pool.Credentials)
  445. mock_init.assert_called_once_with(
  446. audience=WORKFORCE_AUDIENCE,
  447. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  448. token_url=TOKEN_URL,
  449. token_info_url=None,
  450. service_account_impersonation_url=None,
  451. service_account_impersonation_options={},
  452. client_id=None,
  453. client_secret=None,
  454. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  455. quota_project_id=None,
  456. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  457. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  458. )
  459. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  460. def test_from_file_full_options(self, mock_init, tmpdir):
  461. info = {
  462. "audience": AUDIENCE,
  463. "subject_token_type": SUBJECT_TOKEN_TYPE,
  464. "token_url": TOKEN_URL,
  465. "token_info_url": TOKEN_INFO_URL,
  466. "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
  467. "service_account_impersonation": {"token_lifetime_seconds": 2800},
  468. "client_id": CLIENT_ID,
  469. "client_secret": CLIENT_SECRET,
  470. "quota_project_id": QUOTA_PROJECT_ID,
  471. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  472. }
  473. config_file = tmpdir.join("config.json")
  474. config_file.write(json.dumps(info))
  475. credentials = identity_pool.Credentials.from_file(str(config_file))
  476. # Confirm identity_pool.Credentials instantiated with expected attributes.
  477. assert isinstance(credentials, identity_pool.Credentials)
  478. mock_init.assert_called_once_with(
  479. audience=AUDIENCE,
  480. subject_token_type=SUBJECT_TOKEN_TYPE,
  481. token_url=TOKEN_URL,
  482. token_info_url=TOKEN_INFO_URL,
  483. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  484. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  485. client_id=CLIENT_ID,
  486. client_secret=CLIENT_SECRET,
  487. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  488. quota_project_id=QUOTA_PROJECT_ID,
  489. workforce_pool_user_project=None,
  490. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  491. )
  492. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  493. def test_from_file_required_options_only(self, mock_init, tmpdir):
  494. info = {
  495. "audience": AUDIENCE,
  496. "subject_token_type": SUBJECT_TOKEN_TYPE,
  497. "token_url": TOKEN_URL,
  498. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  499. }
  500. config_file = tmpdir.join("config.json")
  501. config_file.write(json.dumps(info))
  502. credentials = identity_pool.Credentials.from_file(str(config_file))
  503. # Confirm identity_pool.Credentials instantiated with expected attributes.
  504. assert isinstance(credentials, identity_pool.Credentials)
  505. mock_init.assert_called_once_with(
  506. audience=AUDIENCE,
  507. subject_token_type=SUBJECT_TOKEN_TYPE,
  508. token_url=TOKEN_URL,
  509. token_info_url=None,
  510. service_account_impersonation_url=None,
  511. service_account_impersonation_options={},
  512. client_id=None,
  513. client_secret=None,
  514. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  515. quota_project_id=None,
  516. workforce_pool_user_project=None,
  517. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  518. )
  519. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  520. def test_from_file_workforce_pool(self, mock_init, tmpdir):
  521. info = {
  522. "audience": WORKFORCE_AUDIENCE,
  523. "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
  524. "token_url": TOKEN_URL,
  525. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  526. "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
  527. }
  528. config_file = tmpdir.join("config.json")
  529. config_file.write(json.dumps(info))
  530. credentials = identity_pool.Credentials.from_file(str(config_file))
  531. # Confirm identity_pool.Credentials instantiated with expected attributes.
  532. assert isinstance(credentials, identity_pool.Credentials)
  533. mock_init.assert_called_once_with(
  534. audience=WORKFORCE_AUDIENCE,
  535. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  536. token_url=TOKEN_URL,
  537. token_info_url=None,
  538. service_account_impersonation_url=None,
  539. service_account_impersonation_options={},
  540. client_id=None,
  541. client_secret=None,
  542. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  543. quota_project_id=None,
  544. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  545. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  546. )
  547. def test_constructor_nonworkforce_with_workforce_pool_user_project(self):
  548. with pytest.raises(ValueError) as excinfo:
  549. self.make_credentials(
  550. audience=AUDIENCE,
  551. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  552. )
  553. assert excinfo.match(
  554. "workforce_pool_user_project should not be set for non-workforce "
  555. "pool credentials"
  556. )
  557. def test_constructor_invalid_options(self):
  558. credential_source = {"unsupported": "value"}
  559. with pytest.raises(ValueError) as excinfo:
  560. self.make_credentials(credential_source=credential_source)
  561. assert excinfo.match(r"Missing credential_source")
  562. def test_constructor_invalid_options_url_and_file(self):
  563. credential_source = {
  564. "url": self.CREDENTIAL_URL,
  565. "file": SUBJECT_TOKEN_TEXT_FILE,
  566. }
  567. with pytest.raises(ValueError) as excinfo:
  568. self.make_credentials(credential_source=credential_source)
  569. assert excinfo.match(r"Ambiguous credential_source")
  570. def test_constructor_invalid_options_environment_id(self):
  571. credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
  572. with pytest.raises(ValueError) as excinfo:
  573. self.make_credentials(credential_source=credential_source)
  574. assert excinfo.match(
  575. r"Invalid Identity Pool credential_source field 'environment_id'"
  576. )
  577. def test_constructor_invalid_credential_source(self):
  578. with pytest.raises(ValueError) as excinfo:
  579. self.make_credentials(credential_source="non-dict")
  580. assert excinfo.match(r"Missing credential_source")
  581. def test_constructor_invalid_credential_source_format_type(self):
  582. credential_source = {"format": {"type": "xml"}}
  583. with pytest.raises(ValueError) as excinfo:
  584. self.make_credentials(credential_source=credential_source)
  585. assert excinfo.match(r"Invalid credential_source format 'xml'")
  586. def test_constructor_missing_subject_token_field_name(self):
  587. credential_source = {"format": {"type": "json"}}
  588. with pytest.raises(ValueError) as excinfo:
  589. self.make_credentials(credential_source=credential_source)
  590. assert excinfo.match(
  591. r"Missing subject_token_field_name for JSON credential_source format"
  592. )
  593. def test_info_with_workforce_pool_user_project(self):
  594. credentials = self.make_credentials(
  595. audience=WORKFORCE_AUDIENCE,
  596. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  597. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
  598. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  599. )
  600. assert credentials.info == {
  601. "type": "external_account",
  602. "audience": WORKFORCE_AUDIENCE,
  603. "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
  604. "token_url": TOKEN_URL,
  605. "token_info_url": TOKEN_INFO_URL,
  606. "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
  607. "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
  608. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  609. }
  610. def test_info_with_file_credential_source(self):
  611. credentials = self.make_credentials(
  612. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
  613. )
  614. assert credentials.info == {
  615. "type": "external_account",
  616. "audience": AUDIENCE,
  617. "subject_token_type": SUBJECT_TOKEN_TYPE,
  618. "token_url": TOKEN_URL,
  619. "token_info_url": TOKEN_INFO_URL,
  620. "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
  621. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  622. }
  623. def test_info_with_url_credential_source(self):
  624. credentials = self.make_credentials(
  625. credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
  626. )
  627. assert credentials.info == {
  628. "type": "external_account",
  629. "audience": AUDIENCE,
  630. "subject_token_type": SUBJECT_TOKEN_TYPE,
  631. "token_url": TOKEN_URL,
  632. "token_info_url": TOKEN_INFO_URL,
  633. "credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
  634. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  635. }
  636. def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
  637. # Provide empty text file.
  638. empty_file = tmpdir.join("empty.txt")
  639. empty_file.write("")
  640. credential_source = {"file": str(empty_file)}
  641. credentials = self.make_credentials(credential_source=credential_source)
  642. with pytest.raises(exceptions.RefreshError) as excinfo:
  643. credentials.retrieve_subject_token(None)
  644. assert excinfo.match(r"Missing subject_token in the credential_source file")
  645. def test_retrieve_subject_token_text_file(self):
  646. credentials = self.make_credentials(
  647. credential_source=self.CREDENTIAL_SOURCE_TEXT
  648. )
  649. subject_token = credentials.retrieve_subject_token(None)
  650. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  651. def test_retrieve_subject_token_json_file(self):
  652. credentials = self.make_credentials(
  653. credential_source=self.CREDENTIAL_SOURCE_JSON
  654. )
  655. subject_token = credentials.retrieve_subject_token(None)
  656. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  657. def test_retrieve_subject_token_json_file_invalid_field_name(self):
  658. credential_source = {
  659. "file": SUBJECT_TOKEN_JSON_FILE,
  660. "format": {"type": "json", "subject_token_field_name": "not_found"},
  661. }
  662. credentials = self.make_credentials(credential_source=credential_source)
  663. with pytest.raises(exceptions.RefreshError) as excinfo:
  664. credentials.retrieve_subject_token(None)
  665. assert excinfo.match(
  666. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  667. SUBJECT_TOKEN_JSON_FILE, "not_found"
  668. )
  669. )
  670. def test_retrieve_subject_token_invalid_json(self, tmpdir):
  671. # Provide JSON file. This should result in JSON parsing error.
  672. invalid_json_file = tmpdir.join("invalid.json")
  673. invalid_json_file.write("{")
  674. credential_source = {
  675. "file": str(invalid_json_file),
  676. "format": {"type": "json", "subject_token_field_name": "access_token"},
  677. }
  678. credentials = self.make_credentials(credential_source=credential_source)
  679. with pytest.raises(exceptions.RefreshError) as excinfo:
  680. credentials.retrieve_subject_token(None)
  681. assert excinfo.match(
  682. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  683. str(invalid_json_file), "access_token"
  684. )
  685. )
  686. def test_retrieve_subject_token_file_not_found(self):
  687. credential_source = {"file": "./not_found.txt"}
  688. credentials = self.make_credentials(credential_source=credential_source)
  689. with pytest.raises(exceptions.RefreshError) as excinfo:
  690. credentials.retrieve_subject_token(None)
  691. assert excinfo.match(r"File './not_found.txt' was not found")
  692. def test_token_info_url(self):
  693. credentials = self.make_credentials(
  694. credential_source=self.CREDENTIAL_SOURCE_JSON
  695. )
  696. assert credentials.token_info_url == TOKEN_INFO_URL
  697. def test_token_info_url_custom(self):
  698. for url in VALID_TOKEN_URLS:
  699. credentials = self.make_credentials(
  700. credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
  701. token_info_url=(url + "/introspect"),
  702. )
  703. assert credentials.token_info_url == url + "/introspect"
  704. def test_token_info_url_negative(self):
  705. credentials = self.make_credentials(
  706. credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), token_info_url=None
  707. )
  708. assert not credentials.token_info_url
  709. def test_token_url_custom(self):
  710. for url in VALID_TOKEN_URLS:
  711. credentials = self.make_credentials(
  712. credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
  713. token_url=(url + "/token"),
  714. )
  715. assert credentials._token_url == (url + "/token")
  716. def test_service_account_impersonation_url_custom(self):
  717. for url in VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS:
  718. credentials = self.make_credentials(
  719. credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
  720. service_account_impersonation_url=(
  721. url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
  722. ),
  723. )
  724. assert credentials._service_account_impersonation_url == (
  725. url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
  726. )
  727. def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
  728. self,
  729. ):
  730. credentials = self.make_credentials(
  731. client_id=CLIENT_ID,
  732. client_secret=CLIENT_SECRET,
  733. # Test with text format type.
  734. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  735. scopes=SCOPES,
  736. # Default scopes should be ignored.
  737. default_scopes=["ignored"],
  738. )
  739. self.assert_underlying_credentials_refresh(
  740. credentials=credentials,
  741. audience=AUDIENCE,
  742. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  743. subject_token_type=SUBJECT_TOKEN_TYPE,
  744. token_url=TOKEN_URL,
  745. service_account_impersonation_url=None,
  746. basic_auth_encoding=BASIC_AUTH_ENCODING,
  747. quota_project_id=None,
  748. used_scopes=SCOPES,
  749. scopes=SCOPES,
  750. default_scopes=["ignored"],
  751. )
  752. def test_refresh_workforce_success_with_client_auth_without_impersonation(self):
  753. credentials = self.make_credentials(
  754. audience=WORKFORCE_AUDIENCE,
  755. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  756. client_id=CLIENT_ID,
  757. client_secret=CLIENT_SECRET,
  758. # Test with text format type.
  759. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  760. scopes=SCOPES,
  761. # This will be ignored in favor of client auth.
  762. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  763. )
  764. self.assert_underlying_credentials_refresh(
  765. credentials=credentials,
  766. audience=WORKFORCE_AUDIENCE,
  767. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  768. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  769. token_url=TOKEN_URL,
  770. service_account_impersonation_url=None,
  771. basic_auth_encoding=BASIC_AUTH_ENCODING,
  772. quota_project_id=None,
  773. used_scopes=SCOPES,
  774. scopes=SCOPES,
  775. workforce_pool_user_project=None,
  776. )
  777. def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self):
  778. credentials = self.make_credentials(
  779. audience=WORKFORCE_AUDIENCE,
  780. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  781. client_id=CLIENT_ID,
  782. client_secret=CLIENT_SECRET,
  783. # Test with text format type.
  784. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  785. scopes=SCOPES,
  786. # This is not needed when client Auth is used.
  787. workforce_pool_user_project=None,
  788. )
  789. self.assert_underlying_credentials_refresh(
  790. credentials=credentials,
  791. audience=WORKFORCE_AUDIENCE,
  792. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  793. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  794. token_url=TOKEN_URL,
  795. service_account_impersonation_url=None,
  796. basic_auth_encoding=BASIC_AUTH_ENCODING,
  797. quota_project_id=None,
  798. used_scopes=SCOPES,
  799. scopes=SCOPES,
  800. workforce_pool_user_project=None,
  801. )
  802. def test_refresh_workforce_success_without_client_auth_without_impersonation(self):
  803. credentials = self.make_credentials(
  804. audience=WORKFORCE_AUDIENCE,
  805. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  806. client_id=None,
  807. client_secret=None,
  808. # Test with text format type.
  809. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  810. scopes=SCOPES,
  811. # This will not be ignored as client auth is not used.
  812. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  813. )
  814. self.assert_underlying_credentials_refresh(
  815. credentials=credentials,
  816. audience=WORKFORCE_AUDIENCE,
  817. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  818. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  819. token_url=TOKEN_URL,
  820. service_account_impersonation_url=None,
  821. basic_auth_encoding=None,
  822. quota_project_id=None,
  823. used_scopes=SCOPES,
  824. scopes=SCOPES,
  825. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  826. )
  827. def test_refresh_workforce_success_without_client_auth_with_impersonation(self):
  828. credentials = self.make_credentials(
  829. audience=WORKFORCE_AUDIENCE,
  830. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  831. client_id=None,
  832. client_secret=None,
  833. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  834. # Test with text format type.
  835. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  836. scopes=SCOPES,
  837. # This will not be ignored as client auth is not used.
  838. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  839. )
  840. self.assert_underlying_credentials_refresh(
  841. credentials=credentials,
  842. audience=WORKFORCE_AUDIENCE,
  843. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  844. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  845. token_url=TOKEN_URL,
  846. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  847. basic_auth_encoding=None,
  848. quota_project_id=None,
  849. used_scopes=SCOPES,
  850. scopes=SCOPES,
  851. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  852. )
  853. def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
  854. credentials = self.make_credentials(
  855. client_id=CLIENT_ID,
  856. client_secret=CLIENT_SECRET,
  857. # Test with text format type.
  858. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  859. scopes=None,
  860. # Default scopes should be used since user specified scopes are none.
  861. default_scopes=SCOPES,
  862. )
  863. self.assert_underlying_credentials_refresh(
  864. credentials=credentials,
  865. audience=AUDIENCE,
  866. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  867. subject_token_type=SUBJECT_TOKEN_TYPE,
  868. token_url=TOKEN_URL,
  869. service_account_impersonation_url=None,
  870. basic_auth_encoding=BASIC_AUTH_ENCODING,
  871. quota_project_id=None,
  872. used_scopes=SCOPES,
  873. scopes=None,
  874. default_scopes=SCOPES,
  875. )
  876. def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
  877. # Initialize credentials with service account impersonation and basic auth.
  878. credentials = self.make_credentials(
  879. # Test with text format type.
  880. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  881. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  882. scopes=SCOPES,
  883. # Default scopes should be ignored.
  884. default_scopes=["ignored"],
  885. )
  886. self.assert_underlying_credentials_refresh(
  887. credentials=credentials,
  888. audience=AUDIENCE,
  889. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  890. subject_token_type=SUBJECT_TOKEN_TYPE,
  891. token_url=TOKEN_URL,
  892. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  893. basic_auth_encoding=None,
  894. quota_project_id=None,
  895. used_scopes=SCOPES,
  896. scopes=SCOPES,
  897. default_scopes=["ignored"],
  898. )
  899. def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
  900. # Initialize credentials with service account impersonation, basic auth
  901. # and default scopes (no user scopes).
  902. credentials = self.make_credentials(
  903. # Test with text format type.
  904. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  905. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  906. scopes=None,
  907. # Default scopes should be used since user specified scopes are none.
  908. default_scopes=SCOPES,
  909. )
  910. self.assert_underlying_credentials_refresh(
  911. credentials=credentials,
  912. audience=AUDIENCE,
  913. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  914. subject_token_type=SUBJECT_TOKEN_TYPE,
  915. token_url=TOKEN_URL,
  916. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  917. basic_auth_encoding=None,
  918. quota_project_id=None,
  919. used_scopes=SCOPES,
  920. scopes=None,
  921. default_scopes=SCOPES,
  922. )
  923. def test_refresh_json_file_success_without_impersonation(self):
  924. credentials = self.make_credentials(
  925. client_id=CLIENT_ID,
  926. client_secret=CLIENT_SECRET,
  927. # Test with JSON format type.
  928. credential_source=self.CREDENTIAL_SOURCE_JSON,
  929. scopes=SCOPES,
  930. )
  931. self.assert_underlying_credentials_refresh(
  932. credentials=credentials,
  933. audience=AUDIENCE,
  934. subject_token=JSON_FILE_SUBJECT_TOKEN,
  935. subject_token_type=SUBJECT_TOKEN_TYPE,
  936. token_url=TOKEN_URL,
  937. service_account_impersonation_url=None,
  938. basic_auth_encoding=BASIC_AUTH_ENCODING,
  939. quota_project_id=None,
  940. used_scopes=SCOPES,
  941. scopes=SCOPES,
  942. default_scopes=None,
  943. )
  944. def test_refresh_json_file_success_with_impersonation(self):
  945. # Initialize credentials with service account impersonation and basic auth.
  946. credentials = self.make_credentials(
  947. # Test with JSON format type.
  948. credential_source=self.CREDENTIAL_SOURCE_JSON,
  949. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  950. scopes=SCOPES,
  951. )
  952. self.assert_underlying_credentials_refresh(
  953. credentials=credentials,
  954. audience=AUDIENCE,
  955. subject_token=JSON_FILE_SUBJECT_TOKEN,
  956. subject_token_type=SUBJECT_TOKEN_TYPE,
  957. token_url=TOKEN_URL,
  958. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  959. basic_auth_encoding=None,
  960. quota_project_id=None,
  961. used_scopes=SCOPES,
  962. scopes=SCOPES,
  963. default_scopes=None,
  964. )
  965. def test_refresh_with_retrieve_subject_token_error(self):
  966. credential_source = {
  967. "file": SUBJECT_TOKEN_JSON_FILE,
  968. "format": {"type": "json", "subject_token_field_name": "not_found"},
  969. }
  970. credentials = self.make_credentials(credential_source=credential_source)
  971. with pytest.raises(exceptions.RefreshError) as excinfo:
  972. credentials.refresh(None)
  973. assert excinfo.match(
  974. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  975. SUBJECT_TOKEN_JSON_FILE, "not_found"
  976. )
  977. )
  978. def test_retrieve_subject_token_from_url(self):
  979. credentials = self.make_credentials(
  980. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
  981. )
  982. request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
  983. subject_token = credentials.retrieve_subject_token(request)
  984. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  985. self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  986. def test_retrieve_subject_token_from_url_with_headers(self):
  987. credentials = self.make_credentials(
  988. credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
  989. )
  990. request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
  991. subject_token = credentials.retrieve_subject_token(request)
  992. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  993. self.assert_credential_request_kwargs(
  994. request.call_args_list[0][1], {"foo": "bar"}
  995. )
  996. def test_retrieve_subject_token_from_url_json(self):
  997. credentials = self.make_credentials(
  998. credential_source=self.CREDENTIAL_SOURCE_JSON_URL
  999. )
  1000. request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
  1001. subject_token = credentials.retrieve_subject_token(request)
  1002. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  1003. self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  1004. def test_retrieve_subject_token_from_url_json_with_headers(self):
  1005. credentials = self.make_credentials(
  1006. credential_source={
  1007. "url": self.CREDENTIAL_URL,
  1008. "format": {"type": "json", "subject_token_field_name": "access_token"},
  1009. "headers": {"foo": "bar"},
  1010. }
  1011. )
  1012. request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
  1013. subject_token = credentials.retrieve_subject_token(request)
  1014. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  1015. self.assert_credential_request_kwargs(
  1016. request.call_args_list[0][1], {"foo": "bar"}
  1017. )
  1018. def test_retrieve_subject_token_from_url_not_found(self):
  1019. credentials = self.make_credentials(
  1020. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
  1021. )
  1022. with pytest.raises(exceptions.RefreshError) as excinfo:
  1023. credentials.retrieve_subject_token(
  1024. self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
  1025. )
  1026. assert excinfo.match("Unable to retrieve Identity Pool subject token")
  1027. def test_retrieve_subject_token_from_url_json_invalid_field(self):
  1028. credential_source = {
  1029. "url": self.CREDENTIAL_URL,
  1030. "format": {"type": "json", "subject_token_field_name": "not_found"},
  1031. }
  1032. credentials = self.make_credentials(credential_source=credential_source)
  1033. with pytest.raises(exceptions.RefreshError) as excinfo:
  1034. credentials.retrieve_subject_token(
  1035. self.make_mock_request(token_data=JSON_FILE_CONTENT)
  1036. )
  1037. assert excinfo.match(
  1038. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  1039. self.CREDENTIAL_URL, "not_found"
  1040. )
  1041. )
  1042. def test_retrieve_subject_token_from_url_json_invalid_format(self):
  1043. credentials = self.make_credentials(
  1044. credential_source=self.CREDENTIAL_SOURCE_JSON_URL
  1045. )
  1046. with pytest.raises(exceptions.RefreshError) as excinfo:
  1047. credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
  1048. assert excinfo.match(
  1049. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  1050. self.CREDENTIAL_URL, "access_token"
  1051. )
  1052. )
  1053. def test_refresh_text_file_success_without_impersonation_url(self):
  1054. credentials = self.make_credentials(
  1055. client_id=CLIENT_ID,
  1056. client_secret=CLIENT_SECRET,
  1057. # Test with text format type.
  1058. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
  1059. scopes=SCOPES,
  1060. )
  1061. self.assert_underlying_credentials_refresh(
  1062. credentials=credentials,
  1063. audience=AUDIENCE,
  1064. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1065. subject_token_type=SUBJECT_TOKEN_TYPE,
  1066. token_url=TOKEN_URL,
  1067. service_account_impersonation_url=None,
  1068. basic_auth_encoding=BASIC_AUTH_ENCODING,
  1069. quota_project_id=None,
  1070. used_scopes=SCOPES,
  1071. scopes=SCOPES,
  1072. default_scopes=None,
  1073. credential_data=TEXT_FILE_SUBJECT_TOKEN,
  1074. )
  1075. def test_refresh_text_file_success_with_impersonation_url(self):
  1076. # Initialize credentials with service account impersonation and basic auth.
  1077. credentials = self.make_credentials(
  1078. # Test with text format type.
  1079. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
  1080. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1081. scopes=SCOPES,
  1082. )
  1083. self.assert_underlying_credentials_refresh(
  1084. credentials=credentials,
  1085. audience=AUDIENCE,
  1086. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1087. subject_token_type=SUBJECT_TOKEN_TYPE,
  1088. token_url=TOKEN_URL,
  1089. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1090. basic_auth_encoding=None,
  1091. quota_project_id=None,
  1092. used_scopes=SCOPES,
  1093. scopes=SCOPES,
  1094. default_scopes=None,
  1095. credential_data=TEXT_FILE_SUBJECT_TOKEN,
  1096. )
  1097. def test_refresh_json_file_success_without_impersonation_url(self):
  1098. credentials = self.make_credentials(
  1099. client_id=CLIENT_ID,
  1100. client_secret=CLIENT_SECRET,
  1101. # Test with JSON format type.
  1102. credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
  1103. scopes=SCOPES,
  1104. )
  1105. self.assert_underlying_credentials_refresh(
  1106. credentials=credentials,
  1107. audience=AUDIENCE,
  1108. subject_token=JSON_FILE_SUBJECT_TOKEN,
  1109. subject_token_type=SUBJECT_TOKEN_TYPE,
  1110. token_url=TOKEN_URL,
  1111. service_account_impersonation_url=None,
  1112. basic_auth_encoding=BASIC_AUTH_ENCODING,
  1113. quota_project_id=None,
  1114. used_scopes=SCOPES,
  1115. scopes=SCOPES,
  1116. default_scopes=None,
  1117. credential_data=JSON_FILE_CONTENT,
  1118. )
  1119. def test_refresh_json_file_success_with_impersonation_url(self):
  1120. # Initialize credentials with service account impersonation and basic auth.
  1121. credentials = self.make_credentials(
  1122. # Test with JSON format type.
  1123. credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
  1124. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1125. scopes=SCOPES,
  1126. )
  1127. self.assert_underlying_credentials_refresh(
  1128. credentials=credentials,
  1129. audience=AUDIENCE,
  1130. subject_token=JSON_FILE_SUBJECT_TOKEN,
  1131. subject_token_type=SUBJECT_TOKEN_TYPE,
  1132. token_url=TOKEN_URL,
  1133. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1134. basic_auth_encoding=None,
  1135. quota_project_id=None,
  1136. used_scopes=SCOPES,
  1137. scopes=SCOPES,
  1138. default_scopes=None,
  1139. credential_data=JSON_FILE_CONTENT,
  1140. )
  1141. def test_refresh_with_retrieve_subject_token_error_url(self):
  1142. credential_source = {
  1143. "url": self.CREDENTIAL_URL,
  1144. "format": {"type": "json", "subject_token_field_name": "not_found"},
  1145. }
  1146. credentials = self.make_credentials(credential_source=credential_source)
  1147. with pytest.raises(exceptions.RefreshError) as excinfo:
  1148. credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
  1149. assert excinfo.match(
  1150. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  1151. self.CREDENTIAL_URL, "not_found"
  1152. )
  1153. )