test_external_account.py 78 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import urllib
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import exceptions
  22. from google.auth import external_account
  23. from google.auth import transport
  24. from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
  25. from google.auth.credentials import TokenState
  26. IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  27. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  28. )
  29. LANG_LIBRARY_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1"
  30. CLIENT_ID = "username"
  31. CLIENT_SECRET = "password"
  32. # Base64 encoding of "username:password"
  33. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  34. SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
  35. # List of valid workforce pool audiences.
  36. TEST_USER_AUDIENCES = [
  37. "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
  38. "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  39. "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
  40. ]
  41. # Workload identity pool audiences or invalid workforce pool audiences.
  42. TEST_NON_USER_AUDIENCES = [
  43. # Legacy K8s audience format.
  44. "identitynamespace:1f12345:my_provider",
  45. (
  46. "//iam.googleapis.com/projects/123456/locations/"
  47. "global/workloadIdentityPools/pool-id/providers/"
  48. "provider-id"
  49. ),
  50. (
  51. "//iam.googleapis.com/projects/123456/locations/"
  52. "eu/workloadIdentityPools/pool-id/providers/"
  53. "provider-id"
  54. ),
  55. # Pool ID with workforcePools string.
  56. (
  57. "//iam.googleapis.com/projects/123456/locations/"
  58. "global/workloadIdentityPools/workforcePools/providers/"
  59. "provider-id"
  60. ),
  61. # Unrealistic / incorrect workforce pool audiences.
  62. "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  63. "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
  64. "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
  65. "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
  66. "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
  67. ]
  68. class CredentialsImpl(external_account.Credentials):
  69. def __init__(self, **kwargs):
  70. super(CredentialsImpl, self).__init__(**kwargs)
  71. self._counter = 0
  72. def retrieve_subject_token(self, request):
  73. counter = self._counter
  74. self._counter += 1
  75. return "subject_token_{}".format(counter)
  76. class TestCredentials(object):
  77. TOKEN_URL = "https://sts.googleapis.com/v1/token"
  78. TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
  79. PROJECT_NUMBER = "123456"
  80. POOL_ID = "POOL_ID"
  81. PROVIDER_ID = "PROVIDER_ID"
  82. AUDIENCE = (
  83. "//iam.googleapis.com/projects/{}"
  84. "/locations/global/workloadIdentityPools/{}"
  85. "/providers/{}"
  86. ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
  87. WORKFORCE_AUDIENCE = (
  88. "//iam.googleapis.com/locations/global/workforcePools/{}/providers/{}"
  89. ).format(POOL_ID, PROVIDER_ID)
  90. WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
  91. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  92. WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
  93. CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
  94. SUCCESS_RESPONSE = {
  95. "access_token": "ACCESS_TOKEN",
  96. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  97. "token_type": "Bearer",
  98. "expires_in": 3600,
  99. "scope": "scope1 scope2",
  100. }
  101. ERROR_RESPONSE = {
  102. "error": "invalid_request",
  103. "error_description": "Invalid subject token",
  104. "error_uri": "https://tools.ietf.org/html/rfc6749",
  105. }
  106. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  107. SERVICE_ACCOUNT_IMPERSONATION_URL = (
  108. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  109. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  110. )
  111. SCOPES = ["scope1", "scope2"]
  112. IMPERSONATION_ERROR_RESPONSE = {
  113. "error": {
  114. "code": 400,
  115. "message": "Request contains an invalid argument",
  116. "status": "INVALID_ARGUMENT",
  117. }
  118. }
  119. PROJECT_ID = "my-proj-id"
  120. CLOUD_RESOURCE_MANAGER_URL = (
  121. "https://cloudresourcemanager.googleapis.com/v1/projects/"
  122. )
  123. CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
  124. "projectNumber": PROJECT_NUMBER,
  125. "projectId": PROJECT_ID,
  126. "lifecycleState": "ACTIVE",
  127. "name": "project-name",
  128. "createTime": "2018-11-06T04:42:54.109Z",
  129. "parent": {"type": "folder", "id": "12345678901"},
  130. }
  131. @classmethod
  132. def make_credentials(
  133. cls,
  134. client_id=None,
  135. client_secret=None,
  136. quota_project_id=None,
  137. token_info_url=None,
  138. scopes=None,
  139. default_scopes=None,
  140. service_account_impersonation_url=None,
  141. service_account_impersonation_options={},
  142. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  143. ):
  144. return CredentialsImpl(
  145. audience=cls.AUDIENCE,
  146. subject_token_type=cls.SUBJECT_TOKEN_TYPE,
  147. token_url=cls.TOKEN_URL,
  148. token_info_url=token_info_url,
  149. service_account_impersonation_url=service_account_impersonation_url,
  150. service_account_impersonation_options=service_account_impersonation_options,
  151. credential_source=cls.CREDENTIAL_SOURCE,
  152. client_id=client_id,
  153. client_secret=client_secret,
  154. quota_project_id=quota_project_id,
  155. scopes=scopes,
  156. default_scopes=default_scopes,
  157. universe_domain=universe_domain,
  158. )
  159. @classmethod
  160. def make_workforce_pool_credentials(
  161. cls,
  162. client_id=None,
  163. client_secret=None,
  164. quota_project_id=None,
  165. scopes=None,
  166. default_scopes=None,
  167. service_account_impersonation_url=None,
  168. workforce_pool_user_project=None,
  169. ):
  170. return CredentialsImpl(
  171. audience=cls.WORKFORCE_AUDIENCE,
  172. subject_token_type=cls.WORKFORCE_SUBJECT_TOKEN_TYPE,
  173. token_url=cls.TOKEN_URL,
  174. service_account_impersonation_url=service_account_impersonation_url,
  175. credential_source=cls.CREDENTIAL_SOURCE,
  176. client_id=client_id,
  177. client_secret=client_secret,
  178. quota_project_id=quota_project_id,
  179. scopes=scopes,
  180. default_scopes=default_scopes,
  181. workforce_pool_user_project=workforce_pool_user_project,
  182. )
  183. @classmethod
  184. def make_mock_request(
  185. cls,
  186. status=http_client.OK,
  187. data=None,
  188. impersonation_status=None,
  189. impersonation_data=None,
  190. cloud_resource_manager_status=None,
  191. cloud_resource_manager_data=None,
  192. ):
  193. # STS token exchange request.
  194. token_response = mock.create_autospec(transport.Response, instance=True)
  195. token_response.status = status
  196. token_response.data = json.dumps(data).encode("utf-8")
  197. responses = [token_response]
  198. # If service account impersonation is requested, mock the expected response.
  199. if impersonation_status:
  200. impersonation_response = mock.create_autospec(
  201. transport.Response, instance=True
  202. )
  203. impersonation_response.status = impersonation_status
  204. impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
  205. responses.append(impersonation_response)
  206. # If cloud resource manager is requested, mock the expected response.
  207. if cloud_resource_manager_status:
  208. cloud_resource_manager_response = mock.create_autospec(
  209. transport.Response, instance=True
  210. )
  211. cloud_resource_manager_response.status = cloud_resource_manager_status
  212. cloud_resource_manager_response.data = json.dumps(
  213. cloud_resource_manager_data
  214. ).encode("utf-8")
  215. responses.append(cloud_resource_manager_response)
  216. request = mock.create_autospec(transport.Request)
  217. request.side_effect = responses
  218. return request
  219. @classmethod
  220. def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
  221. assert request_kwargs["url"] == cls.TOKEN_URL
  222. assert request_kwargs["method"] == "POST"
  223. assert request_kwargs["headers"] == headers
  224. assert request_kwargs["body"] is not None
  225. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  226. for (k, v) in body_tuples:
  227. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  228. assert len(body_tuples) == len(request_data.keys())
  229. @classmethod
  230. def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
  231. assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
  232. assert request_kwargs["method"] == "POST"
  233. assert request_kwargs["headers"] == headers
  234. assert request_kwargs["body"] is not None
  235. body_json = json.loads(request_kwargs["body"].decode("utf-8"))
  236. assert body_json == request_data
  237. @classmethod
  238. def assert_resource_manager_request_kwargs(
  239. cls, request_kwargs, project_number, headers
  240. ):
  241. assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
  242. assert request_kwargs["method"] == "GET"
  243. assert request_kwargs["headers"] == headers
  244. assert "body" not in request_kwargs
  245. def test_default_state(self):
  246. credentials = self.make_credentials(
  247. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  248. )
  249. # Token url and service account impersonation url should be set
  250. assert credentials._token_url
  251. assert credentials._service_account_impersonation_url
  252. # Not token acquired yet
  253. assert not credentials.token
  254. assert not credentials.valid
  255. # Expiration hasn't been set yet
  256. assert not credentials.expiry
  257. assert not credentials.expired
  258. # Scopes are required
  259. assert not credentials.scopes
  260. assert credentials.requires_scopes
  261. assert not credentials.quota_project_id
  262. # Token info url not set yet
  263. assert not credentials.token_info_url
  264. def test_nonworkforce_with_workforce_pool_user_project(self):
  265. with pytest.raises(ValueError) as excinfo:
  266. CredentialsImpl(
  267. audience=self.AUDIENCE,
  268. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  269. token_url=self.TOKEN_URL,
  270. credential_source=self.CREDENTIAL_SOURCE,
  271. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  272. )
  273. assert excinfo.match(
  274. "workforce_pool_user_project should not be set for non-workforce "
  275. "pool credentials"
  276. )
  277. def test_with_scopes(self):
  278. credentials = self.make_credentials()
  279. assert not credentials.scopes
  280. assert credentials.requires_scopes
  281. scoped_credentials = credentials.with_scopes(["email"])
  282. assert scoped_credentials.has_scopes(["email"])
  283. assert not scoped_credentials.requires_scopes
  284. def test_with_scopes_workforce_pool(self):
  285. credentials = self.make_workforce_pool_credentials(
  286. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  287. )
  288. assert not credentials.scopes
  289. assert credentials.requires_scopes
  290. scoped_credentials = credentials.with_scopes(["email"])
  291. assert scoped_credentials.has_scopes(["email"])
  292. assert not scoped_credentials.requires_scopes
  293. assert (
  294. scoped_credentials.info.get("workforce_pool_user_project")
  295. == self.WORKFORCE_POOL_USER_PROJECT
  296. )
  297. def test_with_scopes_using_user_and_default_scopes(self):
  298. credentials = self.make_credentials()
  299. assert not credentials.scopes
  300. assert credentials.requires_scopes
  301. scoped_credentials = credentials.with_scopes(
  302. ["email"], default_scopes=["profile"]
  303. )
  304. assert scoped_credentials.has_scopes(["email"])
  305. assert not scoped_credentials.has_scopes(["profile"])
  306. assert not scoped_credentials.requires_scopes
  307. assert scoped_credentials.scopes == ["email"]
  308. assert scoped_credentials.default_scopes == ["profile"]
  309. def test_with_scopes_using_default_scopes_only(self):
  310. credentials = self.make_credentials()
  311. assert not credentials.scopes
  312. assert credentials.requires_scopes
  313. scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
  314. assert scoped_credentials.has_scopes(["profile"])
  315. assert not scoped_credentials.requires_scopes
  316. def test_with_scopes_full_options_propagated(self):
  317. credentials = self.make_credentials(
  318. client_id=CLIENT_ID,
  319. client_secret=CLIENT_SECRET,
  320. quota_project_id=self.QUOTA_PROJECT_ID,
  321. scopes=self.SCOPES,
  322. token_info_url=self.TOKEN_INFO_URL,
  323. default_scopes=["default1"],
  324. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  325. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  326. )
  327. with mock.patch.object(
  328. external_account.Credentials, "__init__", return_value=None
  329. ) as mock_init:
  330. credentials.with_scopes(["email"], ["default2"])
  331. # Confirm with_scopes initialized the credential with the expected
  332. # parameters and scopes.
  333. mock_init.assert_called_once_with(
  334. audience=self.AUDIENCE,
  335. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  336. token_url=self.TOKEN_URL,
  337. token_info_url=self.TOKEN_INFO_URL,
  338. credential_source=self.CREDENTIAL_SOURCE,
  339. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  340. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  341. client_id=CLIENT_ID,
  342. client_secret=CLIENT_SECRET,
  343. quota_project_id=self.QUOTA_PROJECT_ID,
  344. scopes=["email"],
  345. default_scopes=["default2"],
  346. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  347. )
  348. def test_with_token_uri(self):
  349. credentials = self.make_credentials()
  350. new_token_uri = "https://eu-sts.googleapis.com/v1/token"
  351. assert credentials._token_url == self.TOKEN_URL
  352. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  353. assert creds_with_new_token_uri._token_url == new_token_uri
  354. def test_with_token_uri_workforce_pool(self):
  355. credentials = self.make_workforce_pool_credentials(
  356. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  357. )
  358. new_token_uri = "https://eu-sts.googleapis.com/v1/token"
  359. assert credentials._token_url == self.TOKEN_URL
  360. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  361. assert creds_with_new_token_uri._token_url == new_token_uri
  362. assert (
  363. creds_with_new_token_uri.info.get("workforce_pool_user_project")
  364. == self.WORKFORCE_POOL_USER_PROJECT
  365. )
  366. def test_with_quota_project(self):
  367. credentials = self.make_credentials()
  368. assert not credentials.scopes
  369. assert not credentials.quota_project_id
  370. quota_project_creds = credentials.with_quota_project("project-foo")
  371. assert quota_project_creds.quota_project_id == "project-foo"
  372. def test_with_quota_project_workforce_pool(self):
  373. credentials = self.make_workforce_pool_credentials(
  374. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  375. )
  376. assert not credentials.scopes
  377. assert not credentials.quota_project_id
  378. quota_project_creds = credentials.with_quota_project("project-foo")
  379. assert quota_project_creds.quota_project_id == "project-foo"
  380. assert (
  381. quota_project_creds.info.get("workforce_pool_user_project")
  382. == self.WORKFORCE_POOL_USER_PROJECT
  383. )
  384. def test_with_quota_project_full_options_propagated(self):
  385. credentials = self.make_credentials(
  386. client_id=CLIENT_ID,
  387. client_secret=CLIENT_SECRET,
  388. token_info_url=self.TOKEN_INFO_URL,
  389. quota_project_id=self.QUOTA_PROJECT_ID,
  390. scopes=self.SCOPES,
  391. default_scopes=["default1"],
  392. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  393. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  394. )
  395. with mock.patch.object(
  396. external_account.Credentials, "__init__", return_value=None
  397. ) as mock_init:
  398. credentials.with_quota_project("project-foo")
  399. # Confirm with_quota_project initialized the credential with the
  400. # expected parameters and quota project ID.
  401. mock_init.assert_called_once_with(
  402. audience=self.AUDIENCE,
  403. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  404. token_url=self.TOKEN_URL,
  405. token_info_url=self.TOKEN_INFO_URL,
  406. credential_source=self.CREDENTIAL_SOURCE,
  407. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  408. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  409. client_id=CLIENT_ID,
  410. client_secret=CLIENT_SECRET,
  411. quota_project_id="project-foo",
  412. scopes=self.SCOPES,
  413. default_scopes=["default1"],
  414. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  415. )
  416. def test_info(self):
  417. credentials = self.make_credentials(universe_domain="dummy_universe.com")
  418. assert credentials.info == {
  419. "type": "external_account",
  420. "audience": self.AUDIENCE,
  421. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  422. "token_url": self.TOKEN_URL,
  423. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  424. "universe_domain": "dummy_universe.com",
  425. }
  426. def test_universe_domain(self):
  427. credentials = self.make_credentials(universe_domain="dummy_universe.com")
  428. assert credentials.universe_domain == "dummy_universe.com"
  429. credentials = self.make_credentials()
  430. assert credentials.universe_domain == DEFAULT_UNIVERSE_DOMAIN
  431. def test_with_universe_domain(self):
  432. credentials = self.make_credentials()
  433. new_credentials = credentials.with_universe_domain("dummy_universe.com")
  434. assert new_credentials.universe_domain == "dummy_universe.com"
  435. def test_info_workforce_pool(self):
  436. credentials = self.make_workforce_pool_credentials(
  437. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  438. )
  439. assert credentials.info == {
  440. "type": "external_account",
  441. "audience": self.WORKFORCE_AUDIENCE,
  442. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  443. "token_url": self.TOKEN_URL,
  444. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  445. "workforce_pool_user_project": self.WORKFORCE_POOL_USER_PROJECT,
  446. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  447. }
  448. def test_info_with_full_options(self):
  449. credentials = self.make_credentials(
  450. client_id=CLIENT_ID,
  451. client_secret=CLIENT_SECRET,
  452. quota_project_id=self.QUOTA_PROJECT_ID,
  453. token_info_url=self.TOKEN_INFO_URL,
  454. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  455. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  456. )
  457. assert credentials.info == {
  458. "type": "external_account",
  459. "audience": self.AUDIENCE,
  460. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  461. "token_url": self.TOKEN_URL,
  462. "token_info_url": self.TOKEN_INFO_URL,
  463. "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  464. "service_account_impersonation": {"token_lifetime_seconds": 2800},
  465. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  466. "quota_project_id": self.QUOTA_PROJECT_ID,
  467. "client_id": CLIENT_ID,
  468. "client_secret": CLIENT_SECRET,
  469. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  470. }
  471. def test_service_account_email_without_impersonation(self):
  472. credentials = self.make_credentials()
  473. assert credentials.service_account_email is None
  474. def test_service_account_email_with_impersonation(self):
  475. credentials = self.make_credentials(
  476. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  477. )
  478. assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
  479. @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
  480. def test_is_user_with_non_users(self, audience):
  481. credentials = CredentialsImpl(
  482. audience=audience,
  483. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  484. token_url=self.TOKEN_URL,
  485. credential_source=self.CREDENTIAL_SOURCE,
  486. )
  487. assert credentials.is_user is False
  488. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  489. def test_is_user_with_users(self, audience):
  490. credentials = CredentialsImpl(
  491. audience=audience,
  492. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  493. token_url=self.TOKEN_URL,
  494. credential_source=self.CREDENTIAL_SOURCE,
  495. )
  496. assert credentials.is_user is True
  497. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  498. def test_is_user_with_users_and_impersonation(self, audience):
  499. # Initialize the credentials with service account impersonation.
  500. credentials = CredentialsImpl(
  501. audience=audience,
  502. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  503. token_url=self.TOKEN_URL,
  504. credential_source=self.CREDENTIAL_SOURCE,
  505. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  506. )
  507. # Even though the audience is for a workforce pool, since service account
  508. # impersonation is used, the credentials will represent a service account and
  509. # not a user.
  510. assert credentials.is_user is False
  511. @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
  512. def test_is_workforce_pool_with_non_users(self, audience):
  513. credentials = CredentialsImpl(
  514. audience=audience,
  515. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  516. token_url=self.TOKEN_URL,
  517. credential_source=self.CREDENTIAL_SOURCE,
  518. )
  519. assert credentials.is_workforce_pool is False
  520. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  521. def test_is_workforce_pool_with_users(self, audience):
  522. credentials = CredentialsImpl(
  523. audience=audience,
  524. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  525. token_url=self.TOKEN_URL,
  526. credential_source=self.CREDENTIAL_SOURCE,
  527. )
  528. assert credentials.is_workforce_pool is True
  529. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  530. def test_is_workforce_pool_with_users_and_impersonation(self, audience):
  531. # Initialize the credentials with workforce audience and service account
  532. # impersonation.
  533. credentials = CredentialsImpl(
  534. audience=audience,
  535. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  536. token_url=self.TOKEN_URL,
  537. credential_source=self.CREDENTIAL_SOURCE,
  538. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  539. )
  540. # Even though impersonation is used, is_workforce_pool should still return True.
  541. assert credentials.is_workforce_pool is True
  542. @pytest.mark.parametrize("mock_expires_in", [2800, "2800"])
  543. @mock.patch(
  544. "google.auth.metrics.python_and_auth_lib_version",
  545. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  546. )
  547. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  548. def test_refresh_without_client_auth_success(
  549. self, unused_utcnow, mock_auth_lib_value, mock_expires_in
  550. ):
  551. response = self.SUCCESS_RESPONSE.copy()
  552. # Test custom expiration to confirm expiry is set correctly.
  553. response["expires_in"] = mock_expires_in
  554. expected_expiry = datetime.datetime.min + datetime.timedelta(
  555. seconds=int(mock_expires_in)
  556. )
  557. headers = {
  558. "Content-Type": "application/x-www-form-urlencoded",
  559. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  560. }
  561. request_data = {
  562. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  563. "audience": self.AUDIENCE,
  564. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  565. "subject_token": "subject_token_0",
  566. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  567. }
  568. request = self.make_mock_request(status=http_client.OK, data=response)
  569. credentials = self.make_credentials()
  570. credentials.refresh(request)
  571. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  572. assert credentials.valid
  573. assert credentials.expiry == expected_expiry
  574. assert not credentials.expired
  575. assert credentials.token == response["access_token"]
  576. @mock.patch(
  577. "google.auth.metrics.python_and_auth_lib_version",
  578. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  579. )
  580. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  581. def test_refresh_workforce_without_client_auth_success(
  582. self, unused_utcnow, test_auth_lib_value
  583. ):
  584. response = self.SUCCESS_RESPONSE.copy()
  585. # Test custom expiration to confirm expiry is set correctly.
  586. response["expires_in"] = 2800
  587. expected_expiry = datetime.datetime.min + datetime.timedelta(
  588. seconds=response["expires_in"]
  589. )
  590. headers = {
  591. "Content-Type": "application/x-www-form-urlencoded",
  592. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  593. }
  594. request_data = {
  595. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  596. "audience": self.WORKFORCE_AUDIENCE,
  597. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  598. "subject_token": "subject_token_0",
  599. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  600. "options": urllib.parse.quote(
  601. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  602. ),
  603. }
  604. request = self.make_mock_request(status=http_client.OK, data=response)
  605. credentials = self.make_workforce_pool_credentials(
  606. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  607. )
  608. credentials.refresh(request)
  609. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  610. assert credentials.valid
  611. assert credentials.expiry == expected_expiry
  612. assert not credentials.expired
  613. assert credentials.token == response["access_token"]
  614. @mock.patch(
  615. "google.auth.metrics.python_and_auth_lib_version",
  616. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  617. )
  618. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  619. def test_refresh_workforce_with_client_auth_success(
  620. self, unused_utcnow, mock_auth_lib_value
  621. ):
  622. response = self.SUCCESS_RESPONSE.copy()
  623. # Test custom expiration to confirm expiry is set correctly.
  624. response["expires_in"] = 2800
  625. expected_expiry = datetime.datetime.min + datetime.timedelta(
  626. seconds=response["expires_in"]
  627. )
  628. headers = {
  629. "Content-Type": "application/x-www-form-urlencoded",
  630. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  631. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  632. }
  633. request_data = {
  634. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  635. "audience": self.WORKFORCE_AUDIENCE,
  636. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  637. "subject_token": "subject_token_0",
  638. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  639. }
  640. request = self.make_mock_request(status=http_client.OK, data=response)
  641. # Client Auth will have higher priority over workforce_pool_user_project.
  642. credentials = self.make_workforce_pool_credentials(
  643. client_id=CLIENT_ID,
  644. client_secret=CLIENT_SECRET,
  645. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  646. )
  647. credentials.refresh(request)
  648. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  649. assert credentials.valid
  650. assert credentials.expiry == expected_expiry
  651. assert not credentials.expired
  652. assert credentials.token == response["access_token"]
  653. @mock.patch(
  654. "google.auth.metrics.python_and_auth_lib_version",
  655. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  656. )
  657. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  658. def test_refresh_workforce_with_client_auth_and_no_workforce_project_success(
  659. self, unused_utcnow, mock_lib_version_value
  660. ):
  661. response = self.SUCCESS_RESPONSE.copy()
  662. # Test custom expiration to confirm expiry is set correctly.
  663. response["expires_in"] = 2800
  664. expected_expiry = datetime.datetime.min + datetime.timedelta(
  665. seconds=response["expires_in"]
  666. )
  667. headers = {
  668. "Content-Type": "application/x-www-form-urlencoded",
  669. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  670. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  671. }
  672. request_data = {
  673. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  674. "audience": self.WORKFORCE_AUDIENCE,
  675. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  676. "subject_token": "subject_token_0",
  677. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  678. }
  679. request = self.make_mock_request(status=http_client.OK, data=response)
  680. # Client Auth will be sufficient for user project determination.
  681. credentials = self.make_workforce_pool_credentials(
  682. client_id=CLIENT_ID,
  683. client_secret=CLIENT_SECRET,
  684. workforce_pool_user_project=None,
  685. )
  686. credentials.refresh(request)
  687. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  688. assert credentials.valid
  689. assert credentials.expiry == expected_expiry
  690. assert not credentials.expired
  691. assert credentials.token == response["access_token"]
  692. @mock.patch(
  693. "google.auth.metrics.token_request_access_token_impersonate",
  694. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  695. )
  696. @mock.patch(
  697. "google.auth.metrics.python_and_auth_lib_version",
  698. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  699. )
  700. def test_refresh_impersonation_without_client_auth_success(
  701. self, mock_metrics_header_value, mock_auth_lib_value
  702. ):
  703. # Simulate service account access token expires in 2800 seconds.
  704. expire_time = (
  705. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  706. ).isoformat("T") + "Z"
  707. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  708. # STS token exchange request/response.
  709. token_response = self.SUCCESS_RESPONSE.copy()
  710. token_headers = {
  711. "Content-Type": "application/x-www-form-urlencoded",
  712. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  713. }
  714. token_request_data = {
  715. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  716. "audience": self.AUDIENCE,
  717. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  718. "subject_token": "subject_token_0",
  719. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  720. "scope": "https://www.googleapis.com/auth/iam",
  721. }
  722. # Service account impersonation request/response.
  723. impersonation_response = {
  724. "accessToken": "SA_ACCESS_TOKEN",
  725. "expireTime": expire_time,
  726. }
  727. impersonation_headers = {
  728. "Content-Type": "application/json",
  729. "authorization": "Bearer {}".format(token_response["access_token"]),
  730. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  731. "x-allowed-locations": "0x0",
  732. }
  733. impersonation_request_data = {
  734. "delegates": None,
  735. "scope": self.SCOPES,
  736. "lifetime": "3600s",
  737. }
  738. # Initialize mock request to handle token exchange and service account
  739. # impersonation request.
  740. request = self.make_mock_request(
  741. status=http_client.OK,
  742. data=token_response,
  743. impersonation_status=http_client.OK,
  744. impersonation_data=impersonation_response,
  745. )
  746. # Initialize credentials with service account impersonation.
  747. credentials = self.make_credentials(
  748. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  749. scopes=self.SCOPES,
  750. )
  751. credentials.refresh(request)
  752. # Only 2 requests should be processed.
  753. assert len(request.call_args_list) == 2
  754. # Verify token exchange request parameters.
  755. self.assert_token_request_kwargs(
  756. request.call_args_list[0][1], token_headers, token_request_data
  757. )
  758. # Verify service account impersonation request parameters.
  759. self.assert_impersonation_request_kwargs(
  760. request.call_args_list[1][1],
  761. impersonation_headers,
  762. impersonation_request_data,
  763. )
  764. assert credentials.valid
  765. assert credentials.expiry == expected_expiry
  766. assert not credentials.expired
  767. assert credentials.token == impersonation_response["accessToken"]
  768. @mock.patch(
  769. "google.auth.metrics.token_request_access_token_impersonate",
  770. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  771. )
  772. @mock.patch(
  773. "google.auth.metrics.python_and_auth_lib_version",
  774. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  775. )
  776. def test_refresh_workforce_impersonation_without_client_auth_success(
  777. self, mock_metrics_header_value, mock_auth_lib_value
  778. ):
  779. # Simulate service account access token expires in 2800 seconds.
  780. expire_time = (
  781. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  782. ).isoformat("T") + "Z"
  783. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  784. # STS token exchange request/response.
  785. token_response = self.SUCCESS_RESPONSE.copy()
  786. token_headers = {
  787. "Content-Type": "application/x-www-form-urlencoded",
  788. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  789. }
  790. token_request_data = {
  791. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  792. "audience": self.WORKFORCE_AUDIENCE,
  793. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  794. "subject_token": "subject_token_0",
  795. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  796. "scope": "https://www.googleapis.com/auth/iam",
  797. "options": urllib.parse.quote(
  798. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  799. ),
  800. }
  801. # Service account impersonation request/response.
  802. impersonation_response = {
  803. "accessToken": "SA_ACCESS_TOKEN",
  804. "expireTime": expire_time,
  805. }
  806. impersonation_headers = {
  807. "Content-Type": "application/json",
  808. "authorization": "Bearer {}".format(token_response["access_token"]),
  809. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  810. "x-allowed-locations": "0x0",
  811. }
  812. impersonation_request_data = {
  813. "delegates": None,
  814. "scope": self.SCOPES,
  815. "lifetime": "3600s",
  816. }
  817. # Initialize mock request to handle token exchange and service account
  818. # impersonation request.
  819. request = self.make_mock_request(
  820. status=http_client.OK,
  821. data=token_response,
  822. impersonation_status=http_client.OK,
  823. impersonation_data=impersonation_response,
  824. )
  825. # Initialize credentials with service account impersonation.
  826. credentials = self.make_workforce_pool_credentials(
  827. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  828. scopes=self.SCOPES,
  829. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  830. )
  831. credentials.refresh(request)
  832. # Only 2 requests should be processed.
  833. assert len(request.call_args_list) == 2
  834. # Verify token exchange request parameters.
  835. self.assert_token_request_kwargs(
  836. request.call_args_list[0][1], token_headers, token_request_data
  837. )
  838. # Verify service account impersonation request parameters.
  839. self.assert_impersonation_request_kwargs(
  840. request.call_args_list[1][1],
  841. impersonation_headers,
  842. impersonation_request_data,
  843. )
  844. assert credentials.valid
  845. assert credentials.expiry == expected_expiry
  846. assert not credentials.expired
  847. assert credentials.token == impersonation_response["accessToken"]
  848. @mock.patch(
  849. "google.auth.metrics.python_and_auth_lib_version",
  850. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  851. )
  852. def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
  853. self, mock_auth_lib_value
  854. ):
  855. headers = {
  856. "Content-Type": "application/x-www-form-urlencoded",
  857. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  858. }
  859. request_data = {
  860. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  861. "audience": self.AUDIENCE,
  862. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  863. "scope": "scope1 scope2",
  864. "subject_token": "subject_token_0",
  865. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  866. }
  867. request = self.make_mock_request(
  868. status=http_client.OK, data=self.SUCCESS_RESPONSE
  869. )
  870. credentials = self.make_credentials(
  871. scopes=["scope1", "scope2"],
  872. # Default scopes will be ignored in favor of user scopes.
  873. default_scopes=["ignored"],
  874. )
  875. credentials.refresh(request)
  876. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  877. assert credentials.valid
  878. assert not credentials.expired
  879. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  880. assert credentials.has_scopes(["scope1", "scope2"])
  881. assert not credentials.has_scopes(["ignored"])
  882. @mock.patch(
  883. "google.auth.metrics.python_and_auth_lib_version",
  884. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  885. )
  886. def test_refresh_without_client_auth_success_explicit_default_scopes_only(
  887. self, mock_auth_lib_value
  888. ):
  889. headers = {
  890. "Content-Type": "application/x-www-form-urlencoded",
  891. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  892. }
  893. request_data = {
  894. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  895. "audience": self.AUDIENCE,
  896. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  897. "scope": "scope1 scope2",
  898. "subject_token": "subject_token_0",
  899. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  900. }
  901. request = self.make_mock_request(
  902. status=http_client.OK, data=self.SUCCESS_RESPONSE
  903. )
  904. credentials = self.make_credentials(
  905. scopes=None,
  906. # Default scopes will be used since user scopes are none.
  907. default_scopes=["scope1", "scope2"],
  908. )
  909. credentials.refresh(request)
  910. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  911. assert credentials.valid
  912. assert not credentials.expired
  913. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  914. assert credentials.has_scopes(["scope1", "scope2"])
  915. def test_refresh_without_client_auth_error(self):
  916. request = self.make_mock_request(
  917. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  918. )
  919. credentials = self.make_credentials()
  920. with pytest.raises(exceptions.OAuthError) as excinfo:
  921. credentials.refresh(request)
  922. assert excinfo.match(
  923. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  924. )
  925. assert not credentials.expired
  926. assert credentials.token is None
  927. def test_refresh_impersonation_without_client_auth_error(self):
  928. request = self.make_mock_request(
  929. status=http_client.OK,
  930. data=self.SUCCESS_RESPONSE,
  931. impersonation_status=http_client.BAD_REQUEST,
  932. impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
  933. )
  934. credentials = self.make_credentials(
  935. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  936. scopes=self.SCOPES,
  937. )
  938. with pytest.raises(exceptions.RefreshError) as excinfo:
  939. credentials.refresh(request)
  940. assert excinfo.match(r"Unable to acquire impersonated credentials")
  941. assert not credentials.expired
  942. assert credentials.token is None
  943. def test_refresh_impersonation_invalid_impersonated_url_error(self):
  944. credentials = self.make_credentials(
  945. service_account_impersonation_url="https://iamcredentials.googleapis.com/v1/invalid",
  946. scopes=self.SCOPES,
  947. )
  948. with pytest.raises(exceptions.RefreshError) as excinfo:
  949. credentials.refresh(None)
  950. assert excinfo.match(
  951. r"Unable to determine target principal from service account impersonation URL."
  952. )
  953. assert not credentials.expired
  954. assert credentials.token is None
  955. @mock.patch(
  956. "google.auth.metrics.python_and_auth_lib_version",
  957. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  958. )
  959. def test_refresh_with_client_auth_success(self, mock_auth_lib_value):
  960. headers = {
  961. "Content-Type": "application/x-www-form-urlencoded",
  962. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  963. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  964. }
  965. request_data = {
  966. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  967. "audience": self.AUDIENCE,
  968. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  969. "subject_token": "subject_token_0",
  970. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  971. }
  972. request = self.make_mock_request(
  973. status=http_client.OK, data=self.SUCCESS_RESPONSE
  974. )
  975. credentials = self.make_credentials(
  976. client_id=CLIENT_ID, client_secret=CLIENT_SECRET
  977. )
  978. credentials.refresh(request)
  979. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  980. assert credentials.valid
  981. assert not credentials.expired
  982. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  983. @mock.patch(
  984. "google.auth.metrics.token_request_access_token_impersonate",
  985. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  986. )
  987. @mock.patch(
  988. "google.auth.metrics.python_and_auth_lib_version",
  989. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  990. )
  991. def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(
  992. self, mock_metrics_header_value, mock_auth_lib_value
  993. ):
  994. # Simulate service account access token expires in 2800 seconds.
  995. expire_time = (
  996. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  997. ).isoformat("T") + "Z"
  998. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  999. # STS token exchange request/response.
  1000. token_response = self.SUCCESS_RESPONSE.copy()
  1001. token_headers = {
  1002. "Content-Type": "application/x-www-form-urlencoded",
  1003. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1004. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1005. }
  1006. token_request_data = {
  1007. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1008. "audience": self.AUDIENCE,
  1009. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1010. "subject_token": "subject_token_0",
  1011. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1012. "scope": "https://www.googleapis.com/auth/iam",
  1013. }
  1014. # Service account impersonation request/response.
  1015. impersonation_response = {
  1016. "accessToken": "SA_ACCESS_TOKEN",
  1017. "expireTime": expire_time,
  1018. }
  1019. impersonation_headers = {
  1020. "Content-Type": "application/json",
  1021. "authorization": "Bearer {}".format(token_response["access_token"]),
  1022. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1023. "x-allowed-locations": "0x0",
  1024. }
  1025. impersonation_request_data = {
  1026. "delegates": None,
  1027. "scope": self.SCOPES,
  1028. "lifetime": "3600s",
  1029. }
  1030. # Initialize mock request to handle token exchange and service account
  1031. # impersonation request.
  1032. request = self.make_mock_request(
  1033. status=http_client.OK,
  1034. data=token_response,
  1035. impersonation_status=http_client.OK,
  1036. impersonation_data=impersonation_response,
  1037. )
  1038. # Initialize credentials with service account impersonation and basic auth.
  1039. credentials = self.make_credentials(
  1040. client_id=CLIENT_ID,
  1041. client_secret=CLIENT_SECRET,
  1042. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1043. scopes=self.SCOPES,
  1044. # Default scopes will be ignored since user scopes are specified.
  1045. default_scopes=["ignored"],
  1046. )
  1047. credentials.refresh(request)
  1048. # Only 2 requests should be processed.
  1049. assert len(request.call_args_list) == 2
  1050. # Verify token exchange request parameters.
  1051. self.assert_token_request_kwargs(
  1052. request.call_args_list[0][1], token_headers, token_request_data
  1053. )
  1054. # Verify service account impersonation request parameters.
  1055. self.assert_impersonation_request_kwargs(
  1056. request.call_args_list[1][1],
  1057. impersonation_headers,
  1058. impersonation_request_data,
  1059. )
  1060. assert credentials.valid
  1061. assert credentials.expiry == expected_expiry
  1062. assert not credentials.expired
  1063. assert credentials.token == impersonation_response["accessToken"]
  1064. @mock.patch(
  1065. "google.auth.metrics.token_request_access_token_impersonate",
  1066. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1067. )
  1068. @mock.patch(
  1069. "google.auth.metrics.python_and_auth_lib_version",
  1070. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1071. )
  1072. def test_refresh_impersonation_with_client_auth_success_use_default_scopes(
  1073. self, mock_metrics_header_value, mock_auth_lib_value
  1074. ):
  1075. # Simulate service account access token expires in 2800 seconds.
  1076. expire_time = (
  1077. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1078. ).isoformat("T") + "Z"
  1079. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1080. # STS token exchange request/response.
  1081. token_response = self.SUCCESS_RESPONSE.copy()
  1082. token_headers = {
  1083. "Content-Type": "application/x-www-form-urlencoded",
  1084. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1085. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1086. }
  1087. token_request_data = {
  1088. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1089. "audience": self.AUDIENCE,
  1090. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1091. "subject_token": "subject_token_0",
  1092. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1093. "scope": "https://www.googleapis.com/auth/iam",
  1094. }
  1095. # Service account impersonation request/response.
  1096. impersonation_response = {
  1097. "accessToken": "SA_ACCESS_TOKEN",
  1098. "expireTime": expire_time,
  1099. }
  1100. impersonation_headers = {
  1101. "Content-Type": "application/json",
  1102. "authorization": "Bearer {}".format(token_response["access_token"]),
  1103. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1104. "x-allowed-locations": "0x0",
  1105. }
  1106. impersonation_request_data = {
  1107. "delegates": None,
  1108. "scope": self.SCOPES,
  1109. "lifetime": "3600s",
  1110. }
  1111. # Initialize mock request to handle token exchange and service account
  1112. # impersonation request.
  1113. request = self.make_mock_request(
  1114. status=http_client.OK,
  1115. data=token_response,
  1116. impersonation_status=http_client.OK,
  1117. impersonation_data=impersonation_response,
  1118. )
  1119. # Initialize credentials with service account impersonation and basic auth.
  1120. credentials = self.make_credentials(
  1121. client_id=CLIENT_ID,
  1122. client_secret=CLIENT_SECRET,
  1123. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1124. scopes=None,
  1125. # Default scopes will be used since user specified scopes are none.
  1126. default_scopes=self.SCOPES,
  1127. )
  1128. credentials.refresh(request)
  1129. # Only 2 requests should be processed.
  1130. assert len(request.call_args_list) == 2
  1131. # Verify token exchange request parameters.
  1132. self.assert_token_request_kwargs(
  1133. request.call_args_list[0][1], token_headers, token_request_data
  1134. )
  1135. # Verify service account impersonation request parameters.
  1136. self.assert_impersonation_request_kwargs(
  1137. request.call_args_list[1][1],
  1138. impersonation_headers,
  1139. impersonation_request_data,
  1140. )
  1141. assert credentials.valid
  1142. assert credentials.expiry == expected_expiry
  1143. assert not credentials.expired
  1144. assert credentials.token == impersonation_response["accessToken"]
  1145. def test_apply_without_quota_project_id(self):
  1146. headers = {}
  1147. request = self.make_mock_request(
  1148. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1149. )
  1150. credentials = self.make_credentials()
  1151. credentials.refresh(request)
  1152. credentials.apply(headers)
  1153. assert headers == {
  1154. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1155. "x-allowed-locations": "0x0",
  1156. }
  1157. def test_apply_workforce_without_quota_project_id(self):
  1158. headers = {}
  1159. request = self.make_mock_request(
  1160. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1161. )
  1162. credentials = self.make_workforce_pool_credentials(
  1163. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  1164. )
  1165. credentials.refresh(request)
  1166. credentials.apply(headers)
  1167. assert headers == {
  1168. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1169. "x-allowed-locations": "0x0",
  1170. }
  1171. def test_apply_impersonation_without_quota_project_id(self):
  1172. expire_time = (
  1173. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1174. ).isoformat("T") + "Z"
  1175. # Service account impersonation response.
  1176. impersonation_response = {
  1177. "accessToken": "SA_ACCESS_TOKEN",
  1178. "expireTime": expire_time,
  1179. }
  1180. # Initialize mock request to handle token exchange and service account
  1181. # impersonation request.
  1182. request = self.make_mock_request(
  1183. status=http_client.OK,
  1184. data=self.SUCCESS_RESPONSE.copy(),
  1185. impersonation_status=http_client.OK,
  1186. impersonation_data=impersonation_response,
  1187. )
  1188. # Initialize credentials with service account impersonation.
  1189. credentials = self.make_credentials(
  1190. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1191. scopes=self.SCOPES,
  1192. )
  1193. headers = {}
  1194. credentials.refresh(request)
  1195. credentials.apply(headers)
  1196. assert headers == {
  1197. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1198. "x-allowed-locations": "0x0",
  1199. }
  1200. def test_apply_with_quota_project_id(self):
  1201. headers = {"other": "header-value"}
  1202. request = self.make_mock_request(
  1203. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1204. )
  1205. credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
  1206. credentials.refresh(request)
  1207. credentials.apply(headers)
  1208. assert headers == {
  1209. "other": "header-value",
  1210. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1211. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1212. "x-allowed-locations": "0x0",
  1213. }
  1214. def test_apply_impersonation_with_quota_project_id(self):
  1215. expire_time = (
  1216. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1217. ).isoformat("T") + "Z"
  1218. # Service account impersonation response.
  1219. impersonation_response = {
  1220. "accessToken": "SA_ACCESS_TOKEN",
  1221. "expireTime": expire_time,
  1222. }
  1223. # Initialize mock request to handle token exchange and service account
  1224. # impersonation request.
  1225. request = self.make_mock_request(
  1226. status=http_client.OK,
  1227. data=self.SUCCESS_RESPONSE.copy(),
  1228. impersonation_status=http_client.OK,
  1229. impersonation_data=impersonation_response,
  1230. )
  1231. # Initialize credentials with service account impersonation.
  1232. credentials = self.make_credentials(
  1233. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1234. scopes=self.SCOPES,
  1235. quota_project_id=self.QUOTA_PROJECT_ID,
  1236. )
  1237. headers = {"other": "header-value"}
  1238. credentials.refresh(request)
  1239. credentials.apply(headers)
  1240. assert headers == {
  1241. "other": "header-value",
  1242. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1243. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1244. "x-allowed-locations": "0x0",
  1245. }
  1246. def test_before_request(self):
  1247. headers = {"other": "header-value"}
  1248. request = self.make_mock_request(
  1249. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1250. )
  1251. credentials = self.make_credentials()
  1252. # First call should call refresh, setting the token.
  1253. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1254. assert headers == {
  1255. "other": "header-value",
  1256. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1257. "x-allowed-locations": "0x0",
  1258. }
  1259. # Second call shouldn't call refresh.
  1260. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1261. assert headers == {
  1262. "other": "header-value",
  1263. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1264. "x-allowed-locations": "0x0",
  1265. }
  1266. def test_before_request_workforce(self):
  1267. headers = {"other": "header-value"}
  1268. request = self.make_mock_request(
  1269. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1270. )
  1271. credentials = self.make_workforce_pool_credentials(
  1272. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  1273. )
  1274. # First call should call refresh, setting the token.
  1275. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1276. assert headers == {
  1277. "other": "header-value",
  1278. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1279. "x-allowed-locations": "0x0",
  1280. }
  1281. # Second call shouldn't call refresh.
  1282. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1283. assert headers == {
  1284. "other": "header-value",
  1285. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1286. "x-allowed-locations": "0x0",
  1287. }
  1288. def test_before_request_impersonation(self):
  1289. expire_time = (
  1290. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1291. ).isoformat("T") + "Z"
  1292. # Service account impersonation response.
  1293. impersonation_response = {
  1294. "accessToken": "SA_ACCESS_TOKEN",
  1295. "expireTime": expire_time,
  1296. }
  1297. # Initialize mock request to handle token exchange and service account
  1298. # impersonation request.
  1299. request = self.make_mock_request(
  1300. status=http_client.OK,
  1301. data=self.SUCCESS_RESPONSE.copy(),
  1302. impersonation_status=http_client.OK,
  1303. impersonation_data=impersonation_response,
  1304. )
  1305. headers = {"other": "header-value"}
  1306. credentials = self.make_credentials(
  1307. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  1308. )
  1309. # First call should call refresh, setting the token.
  1310. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1311. assert headers == {
  1312. "other": "header-value",
  1313. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1314. "x-allowed-locations": "0x0",
  1315. }
  1316. # Second call shouldn't call refresh.
  1317. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1318. assert headers == {
  1319. "other": "header-value",
  1320. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1321. "x-allowed-locations": "0x0",
  1322. }
  1323. @mock.patch("google.auth._helpers.utcnow")
  1324. def test_before_request_expired(self, utcnow):
  1325. headers = {}
  1326. request = self.make_mock_request(
  1327. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1328. )
  1329. credentials = self.make_credentials()
  1330. credentials.token = "token"
  1331. utcnow.return_value = datetime.datetime.min
  1332. # Set the expiration to one second more than now plus the clock skew
  1333. # accomodation. These credentials should be valid.
  1334. credentials.expiry = (
  1335. datetime.datetime.min
  1336. + _helpers.REFRESH_THRESHOLD
  1337. + datetime.timedelta(seconds=1)
  1338. )
  1339. assert credentials.valid
  1340. assert not credentials.expired
  1341. assert credentials.token_state == TokenState.FRESH
  1342. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1343. # Cached token should be used.
  1344. assert headers == {
  1345. "authorization": "Bearer token",
  1346. "x-allowed-locations": "0x0",
  1347. }
  1348. # Next call should simulate 1 second passed.
  1349. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  1350. assert not credentials.valid
  1351. assert credentials.expired
  1352. assert credentials.token_state == TokenState.STALE
  1353. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1354. assert credentials.token_state == TokenState.FRESH
  1355. # New token should be retrieved.
  1356. assert headers == {
  1357. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1358. "x-allowed-locations": "0x0",
  1359. }
  1360. @mock.patch("google.auth._helpers.utcnow")
  1361. def test_before_request_impersonation_expired(self, utcnow):
  1362. headers = {}
  1363. expire_time = (
  1364. datetime.datetime.min + datetime.timedelta(seconds=3601)
  1365. ).isoformat("T") + "Z"
  1366. # Service account impersonation response.
  1367. impersonation_response = {
  1368. "accessToken": "SA_ACCESS_TOKEN",
  1369. "expireTime": expire_time,
  1370. }
  1371. # Initialize mock request to handle token exchange and service account
  1372. # impersonation request.
  1373. request = self.make_mock_request(
  1374. status=http_client.OK,
  1375. data=self.SUCCESS_RESPONSE.copy(),
  1376. impersonation_status=http_client.OK,
  1377. impersonation_data=impersonation_response,
  1378. )
  1379. credentials = self.make_credentials(
  1380. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  1381. )
  1382. credentials.token = "token"
  1383. utcnow.return_value = datetime.datetime.min
  1384. # Set the expiration to one second more than now plus the clock skew
  1385. # accomodation. These credentials should be valid.
  1386. credentials.expiry = (
  1387. datetime.datetime.min
  1388. + _helpers.REFRESH_THRESHOLD
  1389. + datetime.timedelta(seconds=1)
  1390. )
  1391. assert credentials.valid
  1392. assert not credentials.expired
  1393. assert credentials.token_state == TokenState.FRESH
  1394. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1395. assert credentials.token_state == TokenState.FRESH
  1396. # Cached token should be used.
  1397. assert headers == {
  1398. "authorization": "Bearer token",
  1399. "x-allowed-locations": "0x0",
  1400. }
  1401. # Next call should simulate 1 second passed. This will trigger the expiration
  1402. # threshold.
  1403. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  1404. assert not credentials.valid
  1405. assert credentials.expired
  1406. assert credentials.token_state == TokenState.STALE
  1407. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1408. assert credentials.token_state == TokenState.FRESH
  1409. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1410. # New token should be retrieved.
  1411. assert headers == {
  1412. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1413. "x-allowed-locations": "0x0",
  1414. }
  1415. @pytest.mark.parametrize(
  1416. "audience",
  1417. [
  1418. # Legacy K8s audience format.
  1419. "identitynamespace:1f12345:my_provider",
  1420. # Unrealistic audiences.
  1421. "//iam.googleapis.com/projects",
  1422. "//iam.googleapis.com/projects/",
  1423. "//iam.googleapis.com/project/123456",
  1424. "//iam.googleapis.com/projects//123456",
  1425. "//iam.googleapis.com/prefix_projects/123456",
  1426. "//iam.googleapis.com/projects_suffix/123456",
  1427. ],
  1428. )
  1429. def test_project_number_indeterminable(self, audience):
  1430. credentials = CredentialsImpl(
  1431. audience=audience,
  1432. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1433. token_url=self.TOKEN_URL,
  1434. credential_source=self.CREDENTIAL_SOURCE,
  1435. )
  1436. assert credentials.project_number is None
  1437. assert credentials.get_project_id(None) is None
  1438. def test_project_number_determinable(self):
  1439. credentials = CredentialsImpl(
  1440. audience=self.AUDIENCE,
  1441. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1442. token_url=self.TOKEN_URL,
  1443. credential_source=self.CREDENTIAL_SOURCE,
  1444. )
  1445. assert credentials.project_number == self.PROJECT_NUMBER
  1446. def test_project_number_workforce(self):
  1447. credentials = CredentialsImpl(
  1448. audience=self.WORKFORCE_AUDIENCE,
  1449. subject_token_type=self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  1450. token_url=self.TOKEN_URL,
  1451. credential_source=self.CREDENTIAL_SOURCE,
  1452. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1453. )
  1454. assert credentials.project_number is None
  1455. def test_project_id_without_scopes(self):
  1456. # Initialize credentials with no scopes.
  1457. credentials = CredentialsImpl(
  1458. audience=self.AUDIENCE,
  1459. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1460. token_url=self.TOKEN_URL,
  1461. credential_source=self.CREDENTIAL_SOURCE,
  1462. )
  1463. assert credentials.get_project_id(None) is None
  1464. @mock.patch(
  1465. "google.auth.metrics.token_request_access_token_impersonate",
  1466. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1467. )
  1468. @mock.patch(
  1469. "google.auth.metrics.python_and_auth_lib_version",
  1470. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1471. )
  1472. def test_get_project_id_cloud_resource_manager_success(
  1473. self, mock_metrics_header_value, mock_auth_lib_value
  1474. ):
  1475. # STS token exchange request/response.
  1476. token_response = self.SUCCESS_RESPONSE.copy()
  1477. token_headers = {
  1478. "Content-Type": "application/x-www-form-urlencoded",
  1479. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1480. }
  1481. token_request_data = {
  1482. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1483. "audience": self.AUDIENCE,
  1484. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1485. "subject_token": "subject_token_0",
  1486. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1487. "scope": "https://www.googleapis.com/auth/iam",
  1488. }
  1489. # Service account impersonation request/response.
  1490. expire_time = (
  1491. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1492. ).isoformat("T") + "Z"
  1493. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1494. impersonation_response = {
  1495. "accessToken": "SA_ACCESS_TOKEN",
  1496. "expireTime": expire_time,
  1497. }
  1498. impersonation_headers = {
  1499. "Content-Type": "application/json",
  1500. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1501. "authorization": "Bearer {}".format(token_response["access_token"]),
  1502. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1503. "x-allowed-locations": "0x0",
  1504. }
  1505. impersonation_request_data = {
  1506. "delegates": None,
  1507. "scope": self.SCOPES,
  1508. "lifetime": "3600s",
  1509. }
  1510. # Initialize mock request to handle token exchange, service account
  1511. # impersonation and cloud resource manager request.
  1512. request = self.make_mock_request(
  1513. status=http_client.OK,
  1514. data=self.SUCCESS_RESPONSE.copy(),
  1515. impersonation_status=http_client.OK,
  1516. impersonation_data=impersonation_response,
  1517. cloud_resource_manager_status=http_client.OK,
  1518. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1519. )
  1520. credentials = self.make_credentials(
  1521. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1522. scopes=self.SCOPES,
  1523. quota_project_id=self.QUOTA_PROJECT_ID,
  1524. )
  1525. # Expected project ID from cloud resource manager response should be returned.
  1526. project_id = credentials.get_project_id(request)
  1527. assert project_id == self.PROJECT_ID
  1528. # 3 requests should be processed.
  1529. assert len(request.call_args_list) == 3
  1530. # Verify token exchange request parameters.
  1531. self.assert_token_request_kwargs(
  1532. request.call_args_list[0][1], token_headers, token_request_data
  1533. )
  1534. # Verify service account impersonation request parameters.
  1535. self.assert_impersonation_request_kwargs(
  1536. request.call_args_list[1][1],
  1537. impersonation_headers,
  1538. impersonation_request_data,
  1539. )
  1540. # In the process of getting project ID, an access token should be
  1541. # retrieved.
  1542. assert credentials.valid
  1543. assert credentials.expiry == expected_expiry
  1544. assert not credentials.expired
  1545. assert credentials.token == impersonation_response["accessToken"]
  1546. # Verify cloud resource manager request parameters.
  1547. self.assert_resource_manager_request_kwargs(
  1548. request.call_args_list[2][1],
  1549. self.PROJECT_NUMBER,
  1550. {
  1551. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1552. "authorization": "Bearer {}".format(
  1553. impersonation_response["accessToken"]
  1554. ),
  1555. "x-allowed-locations": "0x0",
  1556. },
  1557. )
  1558. # Calling get_project_id again should return the cached project_id.
  1559. project_id = credentials.get_project_id(request)
  1560. assert project_id == self.PROJECT_ID
  1561. # No additional requests.
  1562. assert len(request.call_args_list) == 3
  1563. @mock.patch(
  1564. "google.auth.metrics.python_and_auth_lib_version",
  1565. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1566. )
  1567. def test_workforce_pool_get_project_id_cloud_resource_manager_success(
  1568. self, mock_auth_lib_value
  1569. ):
  1570. # STS token exchange request/response.
  1571. token_headers = {
  1572. "Content-Type": "application/x-www-form-urlencoded",
  1573. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1574. }
  1575. token_request_data = {
  1576. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1577. "audience": self.WORKFORCE_AUDIENCE,
  1578. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1579. "subject_token": "subject_token_0",
  1580. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  1581. "scope": "scope1 scope2",
  1582. "options": urllib.parse.quote(
  1583. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  1584. ),
  1585. }
  1586. # Initialize mock request to handle token exchange and cloud resource
  1587. # manager request.
  1588. request = self.make_mock_request(
  1589. status=http_client.OK,
  1590. data=self.SUCCESS_RESPONSE.copy(),
  1591. cloud_resource_manager_status=http_client.OK,
  1592. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1593. )
  1594. credentials = self.make_workforce_pool_credentials(
  1595. scopes=self.SCOPES,
  1596. quota_project_id=self.QUOTA_PROJECT_ID,
  1597. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1598. )
  1599. # Expected project ID from cloud resource manager response should be returned.
  1600. project_id = credentials.get_project_id(request)
  1601. assert project_id == self.PROJECT_ID
  1602. # 2 requests should be processed.
  1603. assert len(request.call_args_list) == 2
  1604. # Verify token exchange request parameters.
  1605. self.assert_token_request_kwargs(
  1606. request.call_args_list[0][1], token_headers, token_request_data
  1607. )
  1608. # In the process of getting project ID, an access token should be
  1609. # retrieved.
  1610. assert credentials.valid
  1611. assert not credentials.expired
  1612. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1613. # Verify cloud resource manager request parameters.
  1614. self.assert_resource_manager_request_kwargs(
  1615. request.call_args_list[1][1],
  1616. self.WORKFORCE_POOL_USER_PROJECT,
  1617. {
  1618. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1619. "authorization": "Bearer {}".format(
  1620. self.SUCCESS_RESPONSE["access_token"]
  1621. ),
  1622. "x-allowed-locations": "0x0",
  1623. },
  1624. )
  1625. # Calling get_project_id again should return the cached project_id.
  1626. project_id = credentials.get_project_id(request)
  1627. assert project_id == self.PROJECT_ID
  1628. # No additional requests.
  1629. assert len(request.call_args_list) == 2
  1630. @mock.patch(
  1631. "google.auth.metrics.token_request_access_token_impersonate",
  1632. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1633. )
  1634. @mock.patch(
  1635. "google.auth.metrics.python_and_auth_lib_version",
  1636. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1637. )
  1638. def test_refresh_impersonation_with_lifetime(
  1639. self, mock_metrics_header_value, mock_auth_lib_value
  1640. ):
  1641. # Simulate service account access token expires in 2800 seconds.
  1642. expire_time = (
  1643. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1644. ).isoformat("T") + "Z"
  1645. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1646. # STS token exchange request/response.
  1647. token_response = self.SUCCESS_RESPONSE.copy()
  1648. token_headers = {
  1649. "Content-Type": "application/x-www-form-urlencoded",
  1650. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/true",
  1651. }
  1652. token_request_data = {
  1653. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1654. "audience": self.AUDIENCE,
  1655. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1656. "subject_token": "subject_token_0",
  1657. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1658. "scope": "https://www.googleapis.com/auth/iam",
  1659. }
  1660. # Service account impersonation request/response.
  1661. impersonation_response = {
  1662. "accessToken": "SA_ACCESS_TOKEN",
  1663. "expireTime": expire_time,
  1664. }
  1665. impersonation_headers = {
  1666. "Content-Type": "application/json",
  1667. "authorization": "Bearer {}".format(token_response["access_token"]),
  1668. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1669. "x-allowed-locations": "0x0",
  1670. }
  1671. impersonation_request_data = {
  1672. "delegates": None,
  1673. "scope": self.SCOPES,
  1674. "lifetime": "2800s",
  1675. }
  1676. # Initialize mock request to handle token exchange and service account
  1677. # impersonation request.
  1678. request = self.make_mock_request(
  1679. status=http_client.OK,
  1680. data=token_response,
  1681. impersonation_status=http_client.OK,
  1682. impersonation_data=impersonation_response,
  1683. )
  1684. # Initialize credentials with service account impersonation.
  1685. credentials = self.make_credentials(
  1686. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1687. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  1688. scopes=self.SCOPES,
  1689. )
  1690. credentials.refresh(request)
  1691. # Only 2 requests should be processed.
  1692. assert len(request.call_args_list) == 2
  1693. # Verify token exchange request parameters.
  1694. self.assert_token_request_kwargs(
  1695. request.call_args_list[0][1], token_headers, token_request_data
  1696. )
  1697. # Verify service account impersonation request parameters.
  1698. self.assert_impersonation_request_kwargs(
  1699. request.call_args_list[1][1],
  1700. impersonation_headers,
  1701. impersonation_request_data,
  1702. )
  1703. assert credentials.valid
  1704. assert credentials.expiry == expected_expiry
  1705. assert not credentials.expired
  1706. assert credentials.token == impersonation_response["accessToken"]
  1707. def test_get_project_id_cloud_resource_manager_error(self):
  1708. # Simulate resource doesn't have sufficient permissions to access
  1709. # cloud resource manager.
  1710. request = self.make_mock_request(
  1711. status=http_client.OK,
  1712. data=self.SUCCESS_RESPONSE.copy(),
  1713. cloud_resource_manager_status=http_client.UNAUTHORIZED,
  1714. )
  1715. credentials = self.make_credentials(scopes=self.SCOPES)
  1716. project_id = credentials.get_project_id(request)
  1717. assert project_id is None
  1718. # Only 2 requests to STS and cloud resource manager should be sent.
  1719. assert len(request.call_args_list) == 2
  1720. def test_supplier_context():
  1721. context = external_account.SupplierContext("TestTokenType", "TestAudience")
  1722. assert context.subject_token_type == "TestTokenType"
  1723. assert context.audience == "TestAudience"