test_external_account.py 77 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import urllib
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import exceptions
  22. from google.auth import external_account
  23. from google.auth import transport
  24. from google.auth.credentials import TokenState
  25. IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  26. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  27. )
  28. LANG_LIBRARY_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1"
  29. CLIENT_ID = "username"
  30. CLIENT_SECRET = "password"
  31. # Base64 encoding of "username:password"
  32. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  33. SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
  34. # List of valid workforce pool audiences.
  35. TEST_USER_AUDIENCES = [
  36. "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
  37. "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  38. "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
  39. ]
  40. # Workload identity pool audiences or invalid workforce pool audiences.
  41. TEST_NON_USER_AUDIENCES = [
  42. # Legacy K8s audience format.
  43. "identitynamespace:1f12345:my_provider",
  44. (
  45. "//iam.googleapis.com/projects/123456/locations/"
  46. "global/workloadIdentityPools/pool-id/providers/"
  47. "provider-id"
  48. ),
  49. (
  50. "//iam.googleapis.com/projects/123456/locations/"
  51. "eu/workloadIdentityPools/pool-id/providers/"
  52. "provider-id"
  53. ),
  54. # Pool ID with workforcePools string.
  55. (
  56. "//iam.googleapis.com/projects/123456/locations/"
  57. "global/workloadIdentityPools/workforcePools/providers/"
  58. "provider-id"
  59. ),
  60. # Unrealistic / incorrect workforce pool audiences.
  61. "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  62. "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
  63. "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
  64. "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
  65. "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
  66. ]
  67. class CredentialsImpl(external_account.Credentials):
  68. def __init__(self, **kwargs):
  69. super(CredentialsImpl, self).__init__(**kwargs)
  70. self._counter = 0
  71. def retrieve_subject_token(self, request):
  72. counter = self._counter
  73. self._counter += 1
  74. return "subject_token_{}".format(counter)
  75. class TestCredentials(object):
  76. TOKEN_URL = "https://sts.googleapis.com/v1/token"
  77. TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
  78. PROJECT_NUMBER = "123456"
  79. POOL_ID = "POOL_ID"
  80. PROVIDER_ID = "PROVIDER_ID"
  81. AUDIENCE = (
  82. "//iam.googleapis.com/projects/{}"
  83. "/locations/global/workloadIdentityPools/{}"
  84. "/providers/{}"
  85. ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
  86. WORKFORCE_AUDIENCE = (
  87. "//iam.googleapis.com/locations/global/workforcePools/{}/providers/{}"
  88. ).format(POOL_ID, PROVIDER_ID)
  89. WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
  90. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  91. WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
  92. CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
  93. SUCCESS_RESPONSE = {
  94. "access_token": "ACCESS_TOKEN",
  95. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  96. "token_type": "Bearer",
  97. "expires_in": 3600,
  98. "scope": "scope1 scope2",
  99. }
  100. ERROR_RESPONSE = {
  101. "error": "invalid_request",
  102. "error_description": "Invalid subject token",
  103. "error_uri": "https://tools.ietf.org/html/rfc6749",
  104. }
  105. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  106. SERVICE_ACCOUNT_IMPERSONATION_URL = (
  107. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  108. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  109. )
  110. SCOPES = ["scope1", "scope2"]
  111. IMPERSONATION_ERROR_RESPONSE = {
  112. "error": {
  113. "code": 400,
  114. "message": "Request contains an invalid argument",
  115. "status": "INVALID_ARGUMENT",
  116. }
  117. }
  118. PROJECT_ID = "my-proj-id"
  119. CLOUD_RESOURCE_MANAGER_URL = (
  120. "https://cloudresourcemanager.googleapis.com/v1/projects/"
  121. )
  122. CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
  123. "projectNumber": PROJECT_NUMBER,
  124. "projectId": PROJECT_ID,
  125. "lifecycleState": "ACTIVE",
  126. "name": "project-name",
  127. "createTime": "2018-11-06T04:42:54.109Z",
  128. "parent": {"type": "folder", "id": "12345678901"},
  129. }
  130. @classmethod
  131. def make_credentials(
  132. cls,
  133. client_id=None,
  134. client_secret=None,
  135. quota_project_id=None,
  136. token_info_url=None,
  137. scopes=None,
  138. default_scopes=None,
  139. service_account_impersonation_url=None,
  140. service_account_impersonation_options={},
  141. universe_domain=external_account._DEFAULT_UNIVERSE_DOMAIN,
  142. ):
  143. return CredentialsImpl(
  144. audience=cls.AUDIENCE,
  145. subject_token_type=cls.SUBJECT_TOKEN_TYPE,
  146. token_url=cls.TOKEN_URL,
  147. token_info_url=token_info_url,
  148. service_account_impersonation_url=service_account_impersonation_url,
  149. service_account_impersonation_options=service_account_impersonation_options,
  150. credential_source=cls.CREDENTIAL_SOURCE,
  151. client_id=client_id,
  152. client_secret=client_secret,
  153. quota_project_id=quota_project_id,
  154. scopes=scopes,
  155. default_scopes=default_scopes,
  156. universe_domain=universe_domain,
  157. )
  158. @classmethod
  159. def make_workforce_pool_credentials(
  160. cls,
  161. client_id=None,
  162. client_secret=None,
  163. quota_project_id=None,
  164. scopes=None,
  165. default_scopes=None,
  166. service_account_impersonation_url=None,
  167. workforce_pool_user_project=None,
  168. ):
  169. return CredentialsImpl(
  170. audience=cls.WORKFORCE_AUDIENCE,
  171. subject_token_type=cls.WORKFORCE_SUBJECT_TOKEN_TYPE,
  172. token_url=cls.TOKEN_URL,
  173. service_account_impersonation_url=service_account_impersonation_url,
  174. credential_source=cls.CREDENTIAL_SOURCE,
  175. client_id=client_id,
  176. client_secret=client_secret,
  177. quota_project_id=quota_project_id,
  178. scopes=scopes,
  179. default_scopes=default_scopes,
  180. workforce_pool_user_project=workforce_pool_user_project,
  181. )
  182. @classmethod
  183. def make_mock_request(
  184. cls,
  185. status=http_client.OK,
  186. data=None,
  187. impersonation_status=None,
  188. impersonation_data=None,
  189. cloud_resource_manager_status=None,
  190. cloud_resource_manager_data=None,
  191. ):
  192. # STS token exchange request.
  193. token_response = mock.create_autospec(transport.Response, instance=True)
  194. token_response.status = status
  195. token_response.data = json.dumps(data).encode("utf-8")
  196. responses = [token_response]
  197. # If service account impersonation is requested, mock the expected response.
  198. if impersonation_status:
  199. impersonation_response = mock.create_autospec(
  200. transport.Response, instance=True
  201. )
  202. impersonation_response.status = impersonation_status
  203. impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
  204. responses.append(impersonation_response)
  205. # If cloud resource manager is requested, mock the expected response.
  206. if cloud_resource_manager_status:
  207. cloud_resource_manager_response = mock.create_autospec(
  208. transport.Response, instance=True
  209. )
  210. cloud_resource_manager_response.status = cloud_resource_manager_status
  211. cloud_resource_manager_response.data = json.dumps(
  212. cloud_resource_manager_data
  213. ).encode("utf-8")
  214. responses.append(cloud_resource_manager_response)
  215. request = mock.create_autospec(transport.Request)
  216. request.side_effect = responses
  217. return request
  218. @classmethod
  219. def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
  220. assert request_kwargs["url"] == cls.TOKEN_URL
  221. assert request_kwargs["method"] == "POST"
  222. assert request_kwargs["headers"] == headers
  223. assert request_kwargs["body"] is not None
  224. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  225. for (k, v) in body_tuples:
  226. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  227. assert len(body_tuples) == len(request_data.keys())
  228. @classmethod
  229. def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
  230. assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
  231. assert request_kwargs["method"] == "POST"
  232. assert request_kwargs["headers"] == headers
  233. assert request_kwargs["body"] is not None
  234. body_json = json.loads(request_kwargs["body"].decode("utf-8"))
  235. assert body_json == request_data
  236. @classmethod
  237. def assert_resource_manager_request_kwargs(
  238. cls, request_kwargs, project_number, headers
  239. ):
  240. assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
  241. assert request_kwargs["method"] == "GET"
  242. assert request_kwargs["headers"] == headers
  243. assert "body" not in request_kwargs
  244. def test_default_state(self):
  245. credentials = self.make_credentials(
  246. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  247. )
  248. # Token url and service account impersonation url should be set
  249. assert credentials._token_url
  250. assert credentials._service_account_impersonation_url
  251. # Not token acquired yet
  252. assert not credentials.token
  253. assert not credentials.valid
  254. # Expiration hasn't been set yet
  255. assert not credentials.expiry
  256. assert not credentials.expired
  257. # Scopes are required
  258. assert not credentials.scopes
  259. assert credentials.requires_scopes
  260. assert not credentials.quota_project_id
  261. # Token info url not set yet
  262. assert not credentials.token_info_url
  263. def test_nonworkforce_with_workforce_pool_user_project(self):
  264. with pytest.raises(ValueError) as excinfo:
  265. CredentialsImpl(
  266. audience=self.AUDIENCE,
  267. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  268. token_url=self.TOKEN_URL,
  269. credential_source=self.CREDENTIAL_SOURCE,
  270. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  271. )
  272. assert excinfo.match(
  273. "workforce_pool_user_project should not be set for non-workforce "
  274. "pool credentials"
  275. )
  276. def test_with_scopes(self):
  277. credentials = self.make_credentials()
  278. assert not credentials.scopes
  279. assert credentials.requires_scopes
  280. scoped_credentials = credentials.with_scopes(["email"])
  281. assert scoped_credentials.has_scopes(["email"])
  282. assert not scoped_credentials.requires_scopes
  283. def test_with_scopes_workforce_pool(self):
  284. credentials = self.make_workforce_pool_credentials(
  285. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  286. )
  287. assert not credentials.scopes
  288. assert credentials.requires_scopes
  289. scoped_credentials = credentials.with_scopes(["email"])
  290. assert scoped_credentials.has_scopes(["email"])
  291. assert not scoped_credentials.requires_scopes
  292. assert (
  293. scoped_credentials.info.get("workforce_pool_user_project")
  294. == self.WORKFORCE_POOL_USER_PROJECT
  295. )
  296. def test_with_scopes_using_user_and_default_scopes(self):
  297. credentials = self.make_credentials()
  298. assert not credentials.scopes
  299. assert credentials.requires_scopes
  300. scoped_credentials = credentials.with_scopes(
  301. ["email"], default_scopes=["profile"]
  302. )
  303. assert scoped_credentials.has_scopes(["email"])
  304. assert not scoped_credentials.has_scopes(["profile"])
  305. assert not scoped_credentials.requires_scopes
  306. assert scoped_credentials.scopes == ["email"]
  307. assert scoped_credentials.default_scopes == ["profile"]
  308. def test_with_scopes_using_default_scopes_only(self):
  309. credentials = self.make_credentials()
  310. assert not credentials.scopes
  311. assert credentials.requires_scopes
  312. scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
  313. assert scoped_credentials.has_scopes(["profile"])
  314. assert not scoped_credentials.requires_scopes
  315. def test_with_scopes_full_options_propagated(self):
  316. credentials = self.make_credentials(
  317. client_id=CLIENT_ID,
  318. client_secret=CLIENT_SECRET,
  319. quota_project_id=self.QUOTA_PROJECT_ID,
  320. scopes=self.SCOPES,
  321. token_info_url=self.TOKEN_INFO_URL,
  322. default_scopes=["default1"],
  323. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  324. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  325. )
  326. with mock.patch.object(
  327. external_account.Credentials, "__init__", return_value=None
  328. ) as mock_init:
  329. credentials.with_scopes(["email"], ["default2"])
  330. # Confirm with_scopes initialized the credential with the expected
  331. # parameters and scopes.
  332. mock_init.assert_called_once_with(
  333. audience=self.AUDIENCE,
  334. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  335. token_url=self.TOKEN_URL,
  336. token_info_url=self.TOKEN_INFO_URL,
  337. credential_source=self.CREDENTIAL_SOURCE,
  338. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  339. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  340. client_id=CLIENT_ID,
  341. client_secret=CLIENT_SECRET,
  342. quota_project_id=self.QUOTA_PROJECT_ID,
  343. scopes=["email"],
  344. default_scopes=["default2"],
  345. universe_domain=external_account._DEFAULT_UNIVERSE_DOMAIN,
  346. )
  347. def test_with_token_uri(self):
  348. credentials = self.make_credentials()
  349. new_token_uri = "https://eu-sts.googleapis.com/v1/token"
  350. assert credentials._token_url == self.TOKEN_URL
  351. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  352. assert creds_with_new_token_uri._token_url == new_token_uri
  353. def test_with_token_uri_workforce_pool(self):
  354. credentials = self.make_workforce_pool_credentials(
  355. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  356. )
  357. new_token_uri = "https://eu-sts.googleapis.com/v1/token"
  358. assert credentials._token_url == self.TOKEN_URL
  359. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  360. assert creds_with_new_token_uri._token_url == new_token_uri
  361. assert (
  362. creds_with_new_token_uri.info.get("workforce_pool_user_project")
  363. == self.WORKFORCE_POOL_USER_PROJECT
  364. )
  365. def test_with_quota_project(self):
  366. credentials = self.make_credentials()
  367. assert not credentials.scopes
  368. assert not credentials.quota_project_id
  369. quota_project_creds = credentials.with_quota_project("project-foo")
  370. assert quota_project_creds.quota_project_id == "project-foo"
  371. def test_with_quota_project_workforce_pool(self):
  372. credentials = self.make_workforce_pool_credentials(
  373. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  374. )
  375. assert not credentials.scopes
  376. assert not credentials.quota_project_id
  377. quota_project_creds = credentials.with_quota_project("project-foo")
  378. assert quota_project_creds.quota_project_id == "project-foo"
  379. assert (
  380. quota_project_creds.info.get("workforce_pool_user_project")
  381. == self.WORKFORCE_POOL_USER_PROJECT
  382. )
  383. def test_with_quota_project_full_options_propagated(self):
  384. credentials = self.make_credentials(
  385. client_id=CLIENT_ID,
  386. client_secret=CLIENT_SECRET,
  387. token_info_url=self.TOKEN_INFO_URL,
  388. quota_project_id=self.QUOTA_PROJECT_ID,
  389. scopes=self.SCOPES,
  390. default_scopes=["default1"],
  391. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  392. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  393. )
  394. with mock.patch.object(
  395. external_account.Credentials, "__init__", return_value=None
  396. ) as mock_init:
  397. credentials.with_quota_project("project-foo")
  398. # Confirm with_quota_project initialized the credential with the
  399. # expected parameters and quota project ID.
  400. mock_init.assert_called_once_with(
  401. audience=self.AUDIENCE,
  402. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  403. token_url=self.TOKEN_URL,
  404. token_info_url=self.TOKEN_INFO_URL,
  405. credential_source=self.CREDENTIAL_SOURCE,
  406. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  407. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  408. client_id=CLIENT_ID,
  409. client_secret=CLIENT_SECRET,
  410. quota_project_id="project-foo",
  411. scopes=self.SCOPES,
  412. default_scopes=["default1"],
  413. universe_domain=external_account._DEFAULT_UNIVERSE_DOMAIN,
  414. )
  415. def test_with_invalid_impersonation_target_principal(self):
  416. invalid_url = "https://iamcredentials.googleapis.com/v1/invalid"
  417. with pytest.raises(exceptions.RefreshError) as excinfo:
  418. self.make_credentials(service_account_impersonation_url=invalid_url)
  419. assert excinfo.match(
  420. r"Unable to determine target principal from service account impersonation URL."
  421. )
  422. def test_info(self):
  423. credentials = self.make_credentials(universe_domain="dummy_universe.com")
  424. assert credentials.info == {
  425. "type": "external_account",
  426. "audience": self.AUDIENCE,
  427. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  428. "token_url": self.TOKEN_URL,
  429. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  430. "universe_domain": "dummy_universe.com",
  431. }
  432. def test_universe_domain(self):
  433. credentials = self.make_credentials(universe_domain="dummy_universe.com")
  434. assert credentials.universe_domain == "dummy_universe.com"
  435. credentials = self.make_credentials()
  436. assert credentials.universe_domain == external_account._DEFAULT_UNIVERSE_DOMAIN
  437. def test_with_universe_domain(self):
  438. credentials = self.make_credentials()
  439. new_credentials = credentials.with_universe_domain("dummy_universe.com")
  440. assert new_credentials.universe_domain == "dummy_universe.com"
  441. def test_info_workforce_pool(self):
  442. credentials = self.make_workforce_pool_credentials(
  443. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  444. )
  445. assert credentials.info == {
  446. "type": "external_account",
  447. "audience": self.WORKFORCE_AUDIENCE,
  448. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  449. "token_url": self.TOKEN_URL,
  450. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  451. "workforce_pool_user_project": self.WORKFORCE_POOL_USER_PROJECT,
  452. "universe_domain": external_account._DEFAULT_UNIVERSE_DOMAIN,
  453. }
  454. def test_info_with_full_options(self):
  455. credentials = self.make_credentials(
  456. client_id=CLIENT_ID,
  457. client_secret=CLIENT_SECRET,
  458. quota_project_id=self.QUOTA_PROJECT_ID,
  459. token_info_url=self.TOKEN_INFO_URL,
  460. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  461. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  462. )
  463. assert credentials.info == {
  464. "type": "external_account",
  465. "audience": self.AUDIENCE,
  466. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  467. "token_url": self.TOKEN_URL,
  468. "token_info_url": self.TOKEN_INFO_URL,
  469. "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  470. "service_account_impersonation": {"token_lifetime_seconds": 2800},
  471. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  472. "quota_project_id": self.QUOTA_PROJECT_ID,
  473. "client_id": CLIENT_ID,
  474. "client_secret": CLIENT_SECRET,
  475. "universe_domain": external_account._DEFAULT_UNIVERSE_DOMAIN,
  476. }
  477. def test_service_account_email_without_impersonation(self):
  478. credentials = self.make_credentials()
  479. assert credentials.service_account_email is None
  480. def test_service_account_email_with_impersonation(self):
  481. credentials = self.make_credentials(
  482. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  483. )
  484. assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
  485. @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
  486. def test_is_user_with_non_users(self, audience):
  487. credentials = CredentialsImpl(
  488. audience=audience,
  489. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  490. token_url=self.TOKEN_URL,
  491. credential_source=self.CREDENTIAL_SOURCE,
  492. )
  493. assert credentials.is_user is False
  494. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  495. def test_is_user_with_users(self, audience):
  496. credentials = CredentialsImpl(
  497. audience=audience,
  498. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  499. token_url=self.TOKEN_URL,
  500. credential_source=self.CREDENTIAL_SOURCE,
  501. )
  502. assert credentials.is_user is True
  503. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  504. def test_is_user_with_users_and_impersonation(self, audience):
  505. # Initialize the credentials with service account impersonation.
  506. credentials = CredentialsImpl(
  507. audience=audience,
  508. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  509. token_url=self.TOKEN_URL,
  510. credential_source=self.CREDENTIAL_SOURCE,
  511. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  512. )
  513. # Even though the audience is for a workforce pool, since service account
  514. # impersonation is used, the credentials will represent a service account and
  515. # not a user.
  516. assert credentials.is_user is False
  517. @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
  518. def test_is_workforce_pool_with_non_users(self, audience):
  519. credentials = CredentialsImpl(
  520. audience=audience,
  521. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  522. token_url=self.TOKEN_URL,
  523. credential_source=self.CREDENTIAL_SOURCE,
  524. )
  525. assert credentials.is_workforce_pool is False
  526. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  527. def test_is_workforce_pool_with_users(self, audience):
  528. credentials = CredentialsImpl(
  529. audience=audience,
  530. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  531. token_url=self.TOKEN_URL,
  532. credential_source=self.CREDENTIAL_SOURCE,
  533. )
  534. assert credentials.is_workforce_pool is True
  535. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  536. def test_is_workforce_pool_with_users_and_impersonation(self, audience):
  537. # Initialize the credentials with workforce audience and service account
  538. # impersonation.
  539. credentials = CredentialsImpl(
  540. audience=audience,
  541. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  542. token_url=self.TOKEN_URL,
  543. credential_source=self.CREDENTIAL_SOURCE,
  544. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  545. )
  546. # Even though impersonation is used, is_workforce_pool should still return True.
  547. assert credentials.is_workforce_pool is True
  548. @pytest.mark.parametrize("mock_expires_in", [2800, "2800"])
  549. @mock.patch(
  550. "google.auth.metrics.python_and_auth_lib_version",
  551. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  552. )
  553. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  554. def test_refresh_without_client_auth_success(
  555. self, unused_utcnow, mock_auth_lib_value, mock_expires_in
  556. ):
  557. response = self.SUCCESS_RESPONSE.copy()
  558. # Test custom expiration to confirm expiry is set correctly.
  559. response["expires_in"] = mock_expires_in
  560. expected_expiry = datetime.datetime.min + datetime.timedelta(
  561. seconds=int(mock_expires_in)
  562. )
  563. headers = {
  564. "Content-Type": "application/x-www-form-urlencoded",
  565. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  566. }
  567. request_data = {
  568. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  569. "audience": self.AUDIENCE,
  570. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  571. "subject_token": "subject_token_0",
  572. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  573. }
  574. request = self.make_mock_request(status=http_client.OK, data=response)
  575. credentials = self.make_credentials()
  576. credentials.refresh(request)
  577. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  578. assert credentials.valid
  579. assert credentials.expiry == expected_expiry
  580. assert not credentials.expired
  581. assert credentials.token == response["access_token"]
  582. @mock.patch(
  583. "google.auth.metrics.python_and_auth_lib_version",
  584. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  585. )
  586. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  587. def test_refresh_workforce_without_client_auth_success(
  588. self, unused_utcnow, test_auth_lib_value
  589. ):
  590. response = self.SUCCESS_RESPONSE.copy()
  591. # Test custom expiration to confirm expiry is set correctly.
  592. response["expires_in"] = 2800
  593. expected_expiry = datetime.datetime.min + datetime.timedelta(
  594. seconds=response["expires_in"]
  595. )
  596. headers = {
  597. "Content-Type": "application/x-www-form-urlencoded",
  598. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  599. }
  600. request_data = {
  601. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  602. "audience": self.WORKFORCE_AUDIENCE,
  603. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  604. "subject_token": "subject_token_0",
  605. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  606. "options": urllib.parse.quote(
  607. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  608. ),
  609. }
  610. request = self.make_mock_request(status=http_client.OK, data=response)
  611. credentials = self.make_workforce_pool_credentials(
  612. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  613. )
  614. credentials.refresh(request)
  615. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  616. assert credentials.valid
  617. assert credentials.expiry == expected_expiry
  618. assert not credentials.expired
  619. assert credentials.token == response["access_token"]
  620. @mock.patch(
  621. "google.auth.metrics.python_and_auth_lib_version",
  622. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  623. )
  624. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  625. def test_refresh_workforce_with_client_auth_success(
  626. self, unused_utcnow, mock_auth_lib_value
  627. ):
  628. response = self.SUCCESS_RESPONSE.copy()
  629. # Test custom expiration to confirm expiry is set correctly.
  630. response["expires_in"] = 2800
  631. expected_expiry = datetime.datetime.min + datetime.timedelta(
  632. seconds=response["expires_in"]
  633. )
  634. headers = {
  635. "Content-Type": "application/x-www-form-urlencoded",
  636. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  637. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  638. }
  639. request_data = {
  640. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  641. "audience": self.WORKFORCE_AUDIENCE,
  642. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  643. "subject_token": "subject_token_0",
  644. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  645. }
  646. request = self.make_mock_request(status=http_client.OK, data=response)
  647. # Client Auth will have higher priority over workforce_pool_user_project.
  648. credentials = self.make_workforce_pool_credentials(
  649. client_id=CLIENT_ID,
  650. client_secret=CLIENT_SECRET,
  651. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  652. )
  653. credentials.refresh(request)
  654. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  655. assert credentials.valid
  656. assert credentials.expiry == expected_expiry
  657. assert not credentials.expired
  658. assert credentials.token == response["access_token"]
  659. @mock.patch(
  660. "google.auth.metrics.python_and_auth_lib_version",
  661. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  662. )
  663. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  664. def test_refresh_workforce_with_client_auth_and_no_workforce_project_success(
  665. self, unused_utcnow, mock_lib_version_value
  666. ):
  667. response = self.SUCCESS_RESPONSE.copy()
  668. # Test custom expiration to confirm expiry is set correctly.
  669. response["expires_in"] = 2800
  670. expected_expiry = datetime.datetime.min + datetime.timedelta(
  671. seconds=response["expires_in"]
  672. )
  673. headers = {
  674. "Content-Type": "application/x-www-form-urlencoded",
  675. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  676. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  677. }
  678. request_data = {
  679. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  680. "audience": self.WORKFORCE_AUDIENCE,
  681. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  682. "subject_token": "subject_token_0",
  683. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  684. }
  685. request = self.make_mock_request(status=http_client.OK, data=response)
  686. # Client Auth will be sufficient for user project determination.
  687. credentials = self.make_workforce_pool_credentials(
  688. client_id=CLIENT_ID,
  689. client_secret=CLIENT_SECRET,
  690. workforce_pool_user_project=None,
  691. )
  692. credentials.refresh(request)
  693. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  694. assert credentials.valid
  695. assert credentials.expiry == expected_expiry
  696. assert not credentials.expired
  697. assert credentials.token == response["access_token"]
  698. @mock.patch(
  699. "google.auth.metrics.token_request_access_token_impersonate",
  700. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  701. )
  702. @mock.patch(
  703. "google.auth.metrics.python_and_auth_lib_version",
  704. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  705. )
  706. def test_refresh_impersonation_without_client_auth_success(
  707. self, mock_metrics_header_value, mock_auth_lib_value
  708. ):
  709. # Simulate service account access token expires in 2800 seconds.
  710. expire_time = (
  711. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  712. ).isoformat("T") + "Z"
  713. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  714. # STS token exchange request/response.
  715. token_response = self.SUCCESS_RESPONSE.copy()
  716. token_headers = {
  717. "Content-Type": "application/x-www-form-urlencoded",
  718. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  719. }
  720. token_request_data = {
  721. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  722. "audience": self.AUDIENCE,
  723. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  724. "subject_token": "subject_token_0",
  725. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  726. "scope": "https://www.googleapis.com/auth/iam",
  727. }
  728. # Service account impersonation request/response.
  729. impersonation_response = {
  730. "accessToken": "SA_ACCESS_TOKEN",
  731. "expireTime": expire_time,
  732. }
  733. impersonation_headers = {
  734. "Content-Type": "application/json",
  735. "authorization": "Bearer {}".format(token_response["access_token"]),
  736. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  737. "x-allowed-locations": "0x0",
  738. }
  739. impersonation_request_data = {
  740. "delegates": None,
  741. "scope": self.SCOPES,
  742. "lifetime": "3600s",
  743. }
  744. # Initialize mock request to handle token exchange and service account
  745. # impersonation request.
  746. request = self.make_mock_request(
  747. status=http_client.OK,
  748. data=token_response,
  749. impersonation_status=http_client.OK,
  750. impersonation_data=impersonation_response,
  751. )
  752. # Initialize credentials with service account impersonation.
  753. credentials = self.make_credentials(
  754. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  755. scopes=self.SCOPES,
  756. )
  757. credentials.refresh(request)
  758. # Only 2 requests should be processed.
  759. assert len(request.call_args_list) == 2
  760. # Verify token exchange request parameters.
  761. self.assert_token_request_kwargs(
  762. request.call_args_list[0][1], token_headers, token_request_data
  763. )
  764. # Verify service account impersonation request parameters.
  765. self.assert_impersonation_request_kwargs(
  766. request.call_args_list[1][1],
  767. impersonation_headers,
  768. impersonation_request_data,
  769. )
  770. assert credentials.valid
  771. assert credentials.expiry == expected_expiry
  772. assert not credentials.expired
  773. assert credentials.token == impersonation_response["accessToken"]
  774. @mock.patch(
  775. "google.auth.metrics.token_request_access_token_impersonate",
  776. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  777. )
  778. @mock.patch(
  779. "google.auth.metrics.python_and_auth_lib_version",
  780. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  781. )
  782. def test_refresh_workforce_impersonation_without_client_auth_success(
  783. self, mock_metrics_header_value, mock_auth_lib_value
  784. ):
  785. # Simulate service account access token expires in 2800 seconds.
  786. expire_time = (
  787. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  788. ).isoformat("T") + "Z"
  789. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  790. # STS token exchange request/response.
  791. token_response = self.SUCCESS_RESPONSE.copy()
  792. token_headers = {
  793. "Content-Type": "application/x-www-form-urlencoded",
  794. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  795. }
  796. token_request_data = {
  797. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  798. "audience": self.WORKFORCE_AUDIENCE,
  799. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  800. "subject_token": "subject_token_0",
  801. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  802. "scope": "https://www.googleapis.com/auth/iam",
  803. "options": urllib.parse.quote(
  804. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  805. ),
  806. }
  807. # Service account impersonation request/response.
  808. impersonation_response = {
  809. "accessToken": "SA_ACCESS_TOKEN",
  810. "expireTime": expire_time,
  811. }
  812. impersonation_headers = {
  813. "Content-Type": "application/json",
  814. "authorization": "Bearer {}".format(token_response["access_token"]),
  815. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  816. "x-allowed-locations": "0x0",
  817. }
  818. impersonation_request_data = {
  819. "delegates": None,
  820. "scope": self.SCOPES,
  821. "lifetime": "3600s",
  822. }
  823. # Initialize mock request to handle token exchange and service account
  824. # impersonation request.
  825. request = self.make_mock_request(
  826. status=http_client.OK,
  827. data=token_response,
  828. impersonation_status=http_client.OK,
  829. impersonation_data=impersonation_response,
  830. )
  831. # Initialize credentials with service account impersonation.
  832. credentials = self.make_workforce_pool_credentials(
  833. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  834. scopes=self.SCOPES,
  835. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  836. )
  837. credentials.refresh(request)
  838. # Only 2 requests should be processed.
  839. assert len(request.call_args_list) == 2
  840. # Verify token exchange request parameters.
  841. self.assert_token_request_kwargs(
  842. request.call_args_list[0][1], token_headers, token_request_data
  843. )
  844. # Verify service account impersonation request parameters.
  845. self.assert_impersonation_request_kwargs(
  846. request.call_args_list[1][1],
  847. impersonation_headers,
  848. impersonation_request_data,
  849. )
  850. assert credentials.valid
  851. assert credentials.expiry == expected_expiry
  852. assert not credentials.expired
  853. assert credentials.token == impersonation_response["accessToken"]
  854. @mock.patch(
  855. "google.auth.metrics.python_and_auth_lib_version",
  856. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  857. )
  858. def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
  859. self, mock_auth_lib_value
  860. ):
  861. headers = {
  862. "Content-Type": "application/x-www-form-urlencoded",
  863. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  864. }
  865. request_data = {
  866. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  867. "audience": self.AUDIENCE,
  868. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  869. "scope": "scope1 scope2",
  870. "subject_token": "subject_token_0",
  871. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  872. }
  873. request = self.make_mock_request(
  874. status=http_client.OK, data=self.SUCCESS_RESPONSE
  875. )
  876. credentials = self.make_credentials(
  877. scopes=["scope1", "scope2"],
  878. # Default scopes will be ignored in favor of user scopes.
  879. default_scopes=["ignored"],
  880. )
  881. credentials.refresh(request)
  882. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  883. assert credentials.valid
  884. assert not credentials.expired
  885. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  886. assert credentials.has_scopes(["scope1", "scope2"])
  887. assert not credentials.has_scopes(["ignored"])
  888. @mock.patch(
  889. "google.auth.metrics.python_and_auth_lib_version",
  890. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  891. )
  892. def test_refresh_without_client_auth_success_explicit_default_scopes_only(
  893. self, mock_auth_lib_value
  894. ):
  895. headers = {
  896. "Content-Type": "application/x-www-form-urlencoded",
  897. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  898. }
  899. request_data = {
  900. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  901. "audience": self.AUDIENCE,
  902. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  903. "scope": "scope1 scope2",
  904. "subject_token": "subject_token_0",
  905. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  906. }
  907. request = self.make_mock_request(
  908. status=http_client.OK, data=self.SUCCESS_RESPONSE
  909. )
  910. credentials = self.make_credentials(
  911. scopes=None,
  912. # Default scopes will be used since user scopes are none.
  913. default_scopes=["scope1", "scope2"],
  914. )
  915. credentials.refresh(request)
  916. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  917. assert credentials.valid
  918. assert not credentials.expired
  919. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  920. assert credentials.has_scopes(["scope1", "scope2"])
  921. def test_refresh_without_client_auth_error(self):
  922. request = self.make_mock_request(
  923. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  924. )
  925. credentials = self.make_credentials()
  926. with pytest.raises(exceptions.OAuthError) as excinfo:
  927. credentials.refresh(request)
  928. assert excinfo.match(
  929. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  930. )
  931. assert not credentials.expired
  932. assert credentials.token is None
  933. def test_refresh_impersonation_without_client_auth_error(self):
  934. request = self.make_mock_request(
  935. status=http_client.OK,
  936. data=self.SUCCESS_RESPONSE,
  937. impersonation_status=http_client.BAD_REQUEST,
  938. impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
  939. )
  940. credentials = self.make_credentials(
  941. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  942. scopes=self.SCOPES,
  943. )
  944. with pytest.raises(exceptions.RefreshError) as excinfo:
  945. credentials.refresh(request)
  946. assert excinfo.match(r"Unable to acquire impersonated credentials")
  947. assert not credentials.expired
  948. assert credentials.token is None
  949. @mock.patch(
  950. "google.auth.metrics.python_and_auth_lib_version",
  951. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  952. )
  953. def test_refresh_with_client_auth_success(self, mock_auth_lib_value):
  954. headers = {
  955. "Content-Type": "application/x-www-form-urlencoded",
  956. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  957. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  958. }
  959. request_data = {
  960. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  961. "audience": self.AUDIENCE,
  962. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  963. "subject_token": "subject_token_0",
  964. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  965. }
  966. request = self.make_mock_request(
  967. status=http_client.OK, data=self.SUCCESS_RESPONSE
  968. )
  969. credentials = self.make_credentials(
  970. client_id=CLIENT_ID, client_secret=CLIENT_SECRET
  971. )
  972. credentials.refresh(request)
  973. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  974. assert credentials.valid
  975. assert not credentials.expired
  976. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  977. @mock.patch(
  978. "google.auth.metrics.token_request_access_token_impersonate",
  979. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  980. )
  981. @mock.patch(
  982. "google.auth.metrics.python_and_auth_lib_version",
  983. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  984. )
  985. def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(
  986. self, mock_metrics_header_value, mock_auth_lib_value
  987. ):
  988. # Simulate service account access token expires in 2800 seconds.
  989. expire_time = (
  990. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  991. ).isoformat("T") + "Z"
  992. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  993. # STS token exchange request/response.
  994. token_response = self.SUCCESS_RESPONSE.copy()
  995. token_headers = {
  996. "Content-Type": "application/x-www-form-urlencoded",
  997. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  998. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  999. }
  1000. token_request_data = {
  1001. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1002. "audience": self.AUDIENCE,
  1003. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1004. "subject_token": "subject_token_0",
  1005. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1006. "scope": "https://www.googleapis.com/auth/iam",
  1007. }
  1008. # Service account impersonation request/response.
  1009. impersonation_response = {
  1010. "accessToken": "SA_ACCESS_TOKEN",
  1011. "expireTime": expire_time,
  1012. }
  1013. impersonation_headers = {
  1014. "Content-Type": "application/json",
  1015. "authorization": "Bearer {}".format(token_response["access_token"]),
  1016. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1017. "x-allowed-locations": "0x0",
  1018. }
  1019. impersonation_request_data = {
  1020. "delegates": None,
  1021. "scope": self.SCOPES,
  1022. "lifetime": "3600s",
  1023. }
  1024. # Initialize mock request to handle token exchange and service account
  1025. # impersonation request.
  1026. request = self.make_mock_request(
  1027. status=http_client.OK,
  1028. data=token_response,
  1029. impersonation_status=http_client.OK,
  1030. impersonation_data=impersonation_response,
  1031. )
  1032. # Initialize credentials with service account impersonation and basic auth.
  1033. credentials = self.make_credentials(
  1034. client_id=CLIENT_ID,
  1035. client_secret=CLIENT_SECRET,
  1036. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1037. scopes=self.SCOPES,
  1038. # Default scopes will be ignored since user scopes are specified.
  1039. default_scopes=["ignored"],
  1040. )
  1041. credentials.refresh(request)
  1042. # Only 2 requests should be processed.
  1043. assert len(request.call_args_list) == 2
  1044. # Verify token exchange request parameters.
  1045. self.assert_token_request_kwargs(
  1046. request.call_args_list[0][1], token_headers, token_request_data
  1047. )
  1048. # Verify service account impersonation request parameters.
  1049. self.assert_impersonation_request_kwargs(
  1050. request.call_args_list[1][1],
  1051. impersonation_headers,
  1052. impersonation_request_data,
  1053. )
  1054. assert credentials.valid
  1055. assert credentials.expiry == expected_expiry
  1056. assert not credentials.expired
  1057. assert credentials.token == impersonation_response["accessToken"]
  1058. @mock.patch(
  1059. "google.auth.metrics.token_request_access_token_impersonate",
  1060. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1061. )
  1062. @mock.patch(
  1063. "google.auth.metrics.python_and_auth_lib_version",
  1064. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1065. )
  1066. def test_refresh_impersonation_with_client_auth_success_use_default_scopes(
  1067. self, mock_metrics_header_value, mock_auth_lib_value
  1068. ):
  1069. # Simulate service account access token expires in 2800 seconds.
  1070. expire_time = (
  1071. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1072. ).isoformat("T") + "Z"
  1073. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1074. # STS token exchange request/response.
  1075. token_response = self.SUCCESS_RESPONSE.copy()
  1076. token_headers = {
  1077. "Content-Type": "application/x-www-form-urlencoded",
  1078. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1079. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1080. }
  1081. token_request_data = {
  1082. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1083. "audience": self.AUDIENCE,
  1084. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1085. "subject_token": "subject_token_0",
  1086. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1087. "scope": "https://www.googleapis.com/auth/iam",
  1088. }
  1089. # Service account impersonation request/response.
  1090. impersonation_response = {
  1091. "accessToken": "SA_ACCESS_TOKEN",
  1092. "expireTime": expire_time,
  1093. }
  1094. impersonation_headers = {
  1095. "Content-Type": "application/json",
  1096. "authorization": "Bearer {}".format(token_response["access_token"]),
  1097. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1098. "x-allowed-locations": "0x0",
  1099. }
  1100. impersonation_request_data = {
  1101. "delegates": None,
  1102. "scope": self.SCOPES,
  1103. "lifetime": "3600s",
  1104. }
  1105. # Initialize mock request to handle token exchange and service account
  1106. # impersonation request.
  1107. request = self.make_mock_request(
  1108. status=http_client.OK,
  1109. data=token_response,
  1110. impersonation_status=http_client.OK,
  1111. impersonation_data=impersonation_response,
  1112. )
  1113. # Initialize credentials with service account impersonation and basic auth.
  1114. credentials = self.make_credentials(
  1115. client_id=CLIENT_ID,
  1116. client_secret=CLIENT_SECRET,
  1117. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1118. scopes=None,
  1119. # Default scopes will be used since user specified scopes are none.
  1120. default_scopes=self.SCOPES,
  1121. )
  1122. credentials.refresh(request)
  1123. # Only 2 requests should be processed.
  1124. assert len(request.call_args_list) == 2
  1125. # Verify token exchange request parameters.
  1126. self.assert_token_request_kwargs(
  1127. request.call_args_list[0][1], token_headers, token_request_data
  1128. )
  1129. # Verify service account impersonation request parameters.
  1130. self.assert_impersonation_request_kwargs(
  1131. request.call_args_list[1][1],
  1132. impersonation_headers,
  1133. impersonation_request_data,
  1134. )
  1135. assert credentials.valid
  1136. assert credentials.expiry == expected_expiry
  1137. assert not credentials.expired
  1138. assert credentials.token == impersonation_response["accessToken"]
  1139. def test_apply_without_quota_project_id(self):
  1140. headers = {}
  1141. request = self.make_mock_request(
  1142. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1143. )
  1144. credentials = self.make_credentials()
  1145. credentials.refresh(request)
  1146. credentials.apply(headers)
  1147. assert headers == {
  1148. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1149. "x-allowed-locations": "0x0",
  1150. }
  1151. def test_apply_workforce_without_quota_project_id(self):
  1152. headers = {}
  1153. request = self.make_mock_request(
  1154. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1155. )
  1156. credentials = self.make_workforce_pool_credentials(
  1157. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  1158. )
  1159. credentials.refresh(request)
  1160. credentials.apply(headers)
  1161. assert headers == {
  1162. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1163. "x-allowed-locations": "0x0",
  1164. }
  1165. def test_apply_impersonation_without_quota_project_id(self):
  1166. expire_time = (
  1167. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1168. ).isoformat("T") + "Z"
  1169. # Service account impersonation response.
  1170. impersonation_response = {
  1171. "accessToken": "SA_ACCESS_TOKEN",
  1172. "expireTime": expire_time,
  1173. }
  1174. # Initialize mock request to handle token exchange and service account
  1175. # impersonation request.
  1176. request = self.make_mock_request(
  1177. status=http_client.OK,
  1178. data=self.SUCCESS_RESPONSE.copy(),
  1179. impersonation_status=http_client.OK,
  1180. impersonation_data=impersonation_response,
  1181. )
  1182. # Initialize credentials with service account impersonation.
  1183. credentials = self.make_credentials(
  1184. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1185. scopes=self.SCOPES,
  1186. )
  1187. headers = {}
  1188. credentials.refresh(request)
  1189. credentials.apply(headers)
  1190. assert headers == {
  1191. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1192. "x-allowed-locations": "0x0",
  1193. }
  1194. def test_apply_with_quota_project_id(self):
  1195. headers = {"other": "header-value"}
  1196. request = self.make_mock_request(
  1197. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1198. )
  1199. credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
  1200. credentials.refresh(request)
  1201. credentials.apply(headers)
  1202. assert headers == {
  1203. "other": "header-value",
  1204. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1205. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1206. "x-allowed-locations": "0x0",
  1207. }
  1208. def test_apply_impersonation_with_quota_project_id(self):
  1209. expire_time = (
  1210. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1211. ).isoformat("T") + "Z"
  1212. # Service account impersonation response.
  1213. impersonation_response = {
  1214. "accessToken": "SA_ACCESS_TOKEN",
  1215. "expireTime": expire_time,
  1216. }
  1217. # Initialize mock request to handle token exchange and service account
  1218. # impersonation request.
  1219. request = self.make_mock_request(
  1220. status=http_client.OK,
  1221. data=self.SUCCESS_RESPONSE.copy(),
  1222. impersonation_status=http_client.OK,
  1223. impersonation_data=impersonation_response,
  1224. )
  1225. # Initialize credentials with service account impersonation.
  1226. credentials = self.make_credentials(
  1227. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1228. scopes=self.SCOPES,
  1229. quota_project_id=self.QUOTA_PROJECT_ID,
  1230. )
  1231. headers = {"other": "header-value"}
  1232. credentials.refresh(request)
  1233. credentials.apply(headers)
  1234. assert headers == {
  1235. "other": "header-value",
  1236. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1237. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1238. "x-allowed-locations": "0x0",
  1239. }
  1240. def test_before_request(self):
  1241. headers = {"other": "header-value"}
  1242. request = self.make_mock_request(
  1243. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1244. )
  1245. credentials = self.make_credentials()
  1246. # First call should call refresh, setting the token.
  1247. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1248. assert headers == {
  1249. "other": "header-value",
  1250. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1251. "x-allowed-locations": "0x0",
  1252. }
  1253. # Second call shouldn't call refresh.
  1254. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1255. assert headers == {
  1256. "other": "header-value",
  1257. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1258. "x-allowed-locations": "0x0",
  1259. }
  1260. def test_before_request_workforce(self):
  1261. headers = {"other": "header-value"}
  1262. request = self.make_mock_request(
  1263. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1264. )
  1265. credentials = self.make_workforce_pool_credentials(
  1266. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  1267. )
  1268. # First call should call refresh, setting the token.
  1269. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1270. assert headers == {
  1271. "other": "header-value",
  1272. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1273. "x-allowed-locations": "0x0",
  1274. }
  1275. # Second call shouldn't call refresh.
  1276. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1277. assert headers == {
  1278. "other": "header-value",
  1279. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1280. "x-allowed-locations": "0x0",
  1281. }
  1282. def test_before_request_impersonation(self):
  1283. expire_time = (
  1284. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1285. ).isoformat("T") + "Z"
  1286. # Service account impersonation response.
  1287. impersonation_response = {
  1288. "accessToken": "SA_ACCESS_TOKEN",
  1289. "expireTime": expire_time,
  1290. }
  1291. # Initialize mock request to handle token exchange and service account
  1292. # impersonation request.
  1293. request = self.make_mock_request(
  1294. status=http_client.OK,
  1295. data=self.SUCCESS_RESPONSE.copy(),
  1296. impersonation_status=http_client.OK,
  1297. impersonation_data=impersonation_response,
  1298. )
  1299. headers = {"other": "header-value"}
  1300. credentials = self.make_credentials(
  1301. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  1302. )
  1303. # First call should call refresh, setting the token.
  1304. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1305. assert headers == {
  1306. "other": "header-value",
  1307. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1308. "x-allowed-locations": "0x0",
  1309. }
  1310. # Second call shouldn't call refresh.
  1311. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1312. assert headers == {
  1313. "other": "header-value",
  1314. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1315. "x-allowed-locations": "0x0",
  1316. }
  1317. @mock.patch("google.auth._helpers.utcnow")
  1318. def test_before_request_expired(self, utcnow):
  1319. headers = {}
  1320. request = self.make_mock_request(
  1321. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1322. )
  1323. credentials = self.make_credentials()
  1324. credentials.token = "token"
  1325. utcnow.return_value = datetime.datetime.min
  1326. # Set the expiration to one second more than now plus the clock skew
  1327. # accomodation. These credentials should be valid.
  1328. credentials.expiry = (
  1329. datetime.datetime.min
  1330. + _helpers.REFRESH_THRESHOLD
  1331. + datetime.timedelta(seconds=1)
  1332. )
  1333. assert credentials.valid
  1334. assert not credentials.expired
  1335. assert credentials.token_state == TokenState.FRESH
  1336. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1337. # Cached token should be used.
  1338. assert headers == {
  1339. "authorization": "Bearer token",
  1340. "x-allowed-locations": "0x0",
  1341. }
  1342. # Next call should simulate 1 second passed.
  1343. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  1344. assert not credentials.valid
  1345. assert credentials.expired
  1346. assert credentials.token_state == TokenState.STALE
  1347. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1348. assert credentials.token_state == TokenState.FRESH
  1349. # New token should be retrieved.
  1350. assert headers == {
  1351. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1352. "x-allowed-locations": "0x0",
  1353. }
  1354. @mock.patch("google.auth._helpers.utcnow")
  1355. def test_before_request_impersonation_expired(self, utcnow):
  1356. headers = {}
  1357. expire_time = (
  1358. datetime.datetime.min + datetime.timedelta(seconds=3601)
  1359. ).isoformat("T") + "Z"
  1360. # Service account impersonation response.
  1361. impersonation_response = {
  1362. "accessToken": "SA_ACCESS_TOKEN",
  1363. "expireTime": expire_time,
  1364. }
  1365. # Initialize mock request to handle token exchange and service account
  1366. # impersonation request.
  1367. request = self.make_mock_request(
  1368. status=http_client.OK,
  1369. data=self.SUCCESS_RESPONSE.copy(),
  1370. impersonation_status=http_client.OK,
  1371. impersonation_data=impersonation_response,
  1372. )
  1373. credentials = self.make_credentials(
  1374. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  1375. )
  1376. credentials.token = "token"
  1377. utcnow.return_value = datetime.datetime.min
  1378. # Set the expiration to one second more than now plus the clock skew
  1379. # accomodation. These credentials should be valid.
  1380. credentials.expiry = (
  1381. datetime.datetime.min
  1382. + _helpers.REFRESH_THRESHOLD
  1383. + datetime.timedelta(seconds=1)
  1384. )
  1385. assert credentials.valid
  1386. assert not credentials.expired
  1387. assert credentials.token_state == TokenState.FRESH
  1388. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1389. assert credentials.token_state == TokenState.FRESH
  1390. # Cached token should be used.
  1391. assert headers == {
  1392. "authorization": "Bearer token",
  1393. "x-allowed-locations": "0x0",
  1394. }
  1395. # Next call should simulate 1 second passed. This will trigger the expiration
  1396. # threshold.
  1397. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  1398. assert not credentials.valid
  1399. assert credentials.expired
  1400. assert credentials.token_state == TokenState.STALE
  1401. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1402. assert credentials.token_state == TokenState.FRESH
  1403. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1404. # New token should be retrieved.
  1405. assert headers == {
  1406. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1407. "x-allowed-locations": "0x0",
  1408. }
  1409. @pytest.mark.parametrize(
  1410. "audience",
  1411. [
  1412. # Legacy K8s audience format.
  1413. "identitynamespace:1f12345:my_provider",
  1414. # Unrealistic audiences.
  1415. "//iam.googleapis.com/projects",
  1416. "//iam.googleapis.com/projects/",
  1417. "//iam.googleapis.com/project/123456",
  1418. "//iam.googleapis.com/projects//123456",
  1419. "//iam.googleapis.com/prefix_projects/123456",
  1420. "//iam.googleapis.com/projects_suffix/123456",
  1421. ],
  1422. )
  1423. def test_project_number_indeterminable(self, audience):
  1424. credentials = CredentialsImpl(
  1425. audience=audience,
  1426. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1427. token_url=self.TOKEN_URL,
  1428. credential_source=self.CREDENTIAL_SOURCE,
  1429. )
  1430. assert credentials.project_number is None
  1431. assert credentials.get_project_id(None) is None
  1432. def test_project_number_determinable(self):
  1433. credentials = CredentialsImpl(
  1434. audience=self.AUDIENCE,
  1435. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1436. token_url=self.TOKEN_URL,
  1437. credential_source=self.CREDENTIAL_SOURCE,
  1438. )
  1439. assert credentials.project_number == self.PROJECT_NUMBER
  1440. def test_project_number_workforce(self):
  1441. credentials = CredentialsImpl(
  1442. audience=self.WORKFORCE_AUDIENCE,
  1443. subject_token_type=self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  1444. token_url=self.TOKEN_URL,
  1445. credential_source=self.CREDENTIAL_SOURCE,
  1446. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1447. )
  1448. assert credentials.project_number is None
  1449. def test_project_id_without_scopes(self):
  1450. # Initialize credentials with no scopes.
  1451. credentials = CredentialsImpl(
  1452. audience=self.AUDIENCE,
  1453. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1454. token_url=self.TOKEN_URL,
  1455. credential_source=self.CREDENTIAL_SOURCE,
  1456. )
  1457. assert credentials.get_project_id(None) is None
  1458. @mock.patch(
  1459. "google.auth.metrics.token_request_access_token_impersonate",
  1460. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1461. )
  1462. @mock.patch(
  1463. "google.auth.metrics.python_and_auth_lib_version",
  1464. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1465. )
  1466. def test_get_project_id_cloud_resource_manager_success(
  1467. self, mock_metrics_header_value, mock_auth_lib_value
  1468. ):
  1469. # STS token exchange request/response.
  1470. token_response = self.SUCCESS_RESPONSE.copy()
  1471. token_headers = {
  1472. "Content-Type": "application/x-www-form-urlencoded",
  1473. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1474. }
  1475. token_request_data = {
  1476. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1477. "audience": self.AUDIENCE,
  1478. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1479. "subject_token": "subject_token_0",
  1480. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1481. "scope": "https://www.googleapis.com/auth/iam",
  1482. }
  1483. # Service account impersonation request/response.
  1484. expire_time = (
  1485. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1486. ).isoformat("T") + "Z"
  1487. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1488. impersonation_response = {
  1489. "accessToken": "SA_ACCESS_TOKEN",
  1490. "expireTime": expire_time,
  1491. }
  1492. impersonation_headers = {
  1493. "Content-Type": "application/json",
  1494. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1495. "authorization": "Bearer {}".format(token_response["access_token"]),
  1496. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1497. "x-allowed-locations": "0x0",
  1498. }
  1499. impersonation_request_data = {
  1500. "delegates": None,
  1501. "scope": self.SCOPES,
  1502. "lifetime": "3600s",
  1503. }
  1504. # Initialize mock request to handle token exchange, service account
  1505. # impersonation and cloud resource manager request.
  1506. request = self.make_mock_request(
  1507. status=http_client.OK,
  1508. data=self.SUCCESS_RESPONSE.copy(),
  1509. impersonation_status=http_client.OK,
  1510. impersonation_data=impersonation_response,
  1511. cloud_resource_manager_status=http_client.OK,
  1512. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1513. )
  1514. credentials = self.make_credentials(
  1515. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1516. scopes=self.SCOPES,
  1517. quota_project_id=self.QUOTA_PROJECT_ID,
  1518. )
  1519. # Expected project ID from cloud resource manager response should be returned.
  1520. project_id = credentials.get_project_id(request)
  1521. assert project_id == self.PROJECT_ID
  1522. # 3 requests should be processed.
  1523. assert len(request.call_args_list) == 3
  1524. # Verify token exchange request parameters.
  1525. self.assert_token_request_kwargs(
  1526. request.call_args_list[0][1], token_headers, token_request_data
  1527. )
  1528. # Verify service account impersonation request parameters.
  1529. self.assert_impersonation_request_kwargs(
  1530. request.call_args_list[1][1],
  1531. impersonation_headers,
  1532. impersonation_request_data,
  1533. )
  1534. # In the process of getting project ID, an access token should be
  1535. # retrieved.
  1536. assert credentials.valid
  1537. assert credentials.expiry == expected_expiry
  1538. assert not credentials.expired
  1539. assert credentials.token == impersonation_response["accessToken"]
  1540. # Verify cloud resource manager request parameters.
  1541. self.assert_resource_manager_request_kwargs(
  1542. request.call_args_list[2][1],
  1543. self.PROJECT_NUMBER,
  1544. {
  1545. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1546. "authorization": "Bearer {}".format(
  1547. impersonation_response["accessToken"]
  1548. ),
  1549. "x-allowed-locations": "0x0",
  1550. },
  1551. )
  1552. # Calling get_project_id again should return the cached project_id.
  1553. project_id = credentials.get_project_id(request)
  1554. assert project_id == self.PROJECT_ID
  1555. # No additional requests.
  1556. assert len(request.call_args_list) == 3
  1557. @mock.patch(
  1558. "google.auth.metrics.python_and_auth_lib_version",
  1559. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1560. )
  1561. def test_workforce_pool_get_project_id_cloud_resource_manager_success(
  1562. self, mock_auth_lib_value
  1563. ):
  1564. # STS token exchange request/response.
  1565. token_headers = {
  1566. "Content-Type": "application/x-www-form-urlencoded",
  1567. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1568. }
  1569. token_request_data = {
  1570. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1571. "audience": self.WORKFORCE_AUDIENCE,
  1572. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1573. "subject_token": "subject_token_0",
  1574. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  1575. "scope": "scope1 scope2",
  1576. "options": urllib.parse.quote(
  1577. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  1578. ),
  1579. }
  1580. # Initialize mock request to handle token exchange and cloud resource
  1581. # manager request.
  1582. request = self.make_mock_request(
  1583. status=http_client.OK,
  1584. data=self.SUCCESS_RESPONSE.copy(),
  1585. cloud_resource_manager_status=http_client.OK,
  1586. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1587. )
  1588. credentials = self.make_workforce_pool_credentials(
  1589. scopes=self.SCOPES,
  1590. quota_project_id=self.QUOTA_PROJECT_ID,
  1591. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1592. )
  1593. # Expected project ID from cloud resource manager response should be returned.
  1594. project_id = credentials.get_project_id(request)
  1595. assert project_id == self.PROJECT_ID
  1596. # 2 requests should be processed.
  1597. assert len(request.call_args_list) == 2
  1598. # Verify token exchange request parameters.
  1599. self.assert_token_request_kwargs(
  1600. request.call_args_list[0][1], token_headers, token_request_data
  1601. )
  1602. # In the process of getting project ID, an access token should be
  1603. # retrieved.
  1604. assert credentials.valid
  1605. assert not credentials.expired
  1606. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1607. # Verify cloud resource manager request parameters.
  1608. self.assert_resource_manager_request_kwargs(
  1609. request.call_args_list[1][1],
  1610. self.WORKFORCE_POOL_USER_PROJECT,
  1611. {
  1612. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1613. "authorization": "Bearer {}".format(
  1614. self.SUCCESS_RESPONSE["access_token"]
  1615. ),
  1616. "x-allowed-locations": "0x0",
  1617. },
  1618. )
  1619. # Calling get_project_id again should return the cached project_id.
  1620. project_id = credentials.get_project_id(request)
  1621. assert project_id == self.PROJECT_ID
  1622. # No additional requests.
  1623. assert len(request.call_args_list) == 2
  1624. @mock.patch(
  1625. "google.auth.metrics.token_request_access_token_impersonate",
  1626. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1627. )
  1628. @mock.patch(
  1629. "google.auth.metrics.python_and_auth_lib_version",
  1630. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1631. )
  1632. def test_refresh_impersonation_with_lifetime(
  1633. self, mock_metrics_header_value, mock_auth_lib_value
  1634. ):
  1635. # Simulate service account access token expires in 2800 seconds.
  1636. expire_time = (
  1637. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1638. ).isoformat("T") + "Z"
  1639. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1640. # STS token exchange request/response.
  1641. token_response = self.SUCCESS_RESPONSE.copy()
  1642. token_headers = {
  1643. "Content-Type": "application/x-www-form-urlencoded",
  1644. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/true",
  1645. }
  1646. token_request_data = {
  1647. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1648. "audience": self.AUDIENCE,
  1649. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1650. "subject_token": "subject_token_0",
  1651. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1652. "scope": "https://www.googleapis.com/auth/iam",
  1653. }
  1654. # Service account impersonation request/response.
  1655. impersonation_response = {
  1656. "accessToken": "SA_ACCESS_TOKEN",
  1657. "expireTime": expire_time,
  1658. }
  1659. impersonation_headers = {
  1660. "Content-Type": "application/json",
  1661. "authorization": "Bearer {}".format(token_response["access_token"]),
  1662. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1663. "x-allowed-locations": "0x0",
  1664. }
  1665. impersonation_request_data = {
  1666. "delegates": None,
  1667. "scope": self.SCOPES,
  1668. "lifetime": "2800s",
  1669. }
  1670. # Initialize mock request to handle token exchange and service account
  1671. # impersonation request.
  1672. request = self.make_mock_request(
  1673. status=http_client.OK,
  1674. data=token_response,
  1675. impersonation_status=http_client.OK,
  1676. impersonation_data=impersonation_response,
  1677. )
  1678. # Initialize credentials with service account impersonation.
  1679. credentials = self.make_credentials(
  1680. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1681. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  1682. scopes=self.SCOPES,
  1683. )
  1684. credentials.refresh(request)
  1685. # Only 2 requests should be processed.
  1686. assert len(request.call_args_list) == 2
  1687. # Verify token exchange request parameters.
  1688. self.assert_token_request_kwargs(
  1689. request.call_args_list[0][1], token_headers, token_request_data
  1690. )
  1691. # Verify service account impersonation request parameters.
  1692. self.assert_impersonation_request_kwargs(
  1693. request.call_args_list[1][1],
  1694. impersonation_headers,
  1695. impersonation_request_data,
  1696. )
  1697. assert credentials.valid
  1698. assert credentials.expiry == expected_expiry
  1699. assert not credentials.expired
  1700. assert credentials.token == impersonation_response["accessToken"]
  1701. def test_get_project_id_cloud_resource_manager_error(self):
  1702. # Simulate resource doesn't have sufficient permissions to access
  1703. # cloud resource manager.
  1704. request = self.make_mock_request(
  1705. status=http_client.OK,
  1706. data=self.SUCCESS_RESPONSE.copy(),
  1707. cloud_resource_manager_status=http_client.UNAUTHORIZED,
  1708. )
  1709. credentials = self.make_credentials(scopes=self.SCOPES)
  1710. project_id = credentials.get_project_id(request)
  1711. assert project_id is None
  1712. # Only 2 requests to STS and cloud resource manager should be sent.
  1713. assert len(request.call_args_list) == 2