test_external_account.py 85 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import urllib
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import exceptions
  22. from google.auth import external_account
  23. from google.auth import transport
  24. from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
  25. from google.auth.credentials import TokenState
  26. IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  27. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  28. )
  29. LANG_LIBRARY_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1"
  30. CLIENT_ID = "username"
  31. CLIENT_SECRET = "password"
  32. # Base64 encoding of "username:password"
  33. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  34. SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
  35. # List of valid workforce pool audiences.
  36. TEST_USER_AUDIENCES = [
  37. "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
  38. "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  39. "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
  40. ]
  41. # Workload identity pool audiences or invalid workforce pool audiences.
  42. TEST_NON_USER_AUDIENCES = [
  43. # Legacy K8s audience format.
  44. "identitynamespace:1f12345:my_provider",
  45. (
  46. "//iam.googleapis.com/projects/123456/locations/"
  47. "global/workloadIdentityPools/pool-id/providers/"
  48. "provider-id"
  49. ),
  50. (
  51. "//iam.googleapis.com/projects/123456/locations/"
  52. "eu/workloadIdentityPools/pool-id/providers/"
  53. "provider-id"
  54. ),
  55. # Pool ID with workforcePools string.
  56. (
  57. "//iam.googleapis.com/projects/123456/locations/"
  58. "global/workloadIdentityPools/workforcePools/providers/"
  59. "provider-id"
  60. ),
  61. # Unrealistic / incorrect workforce pool audiences.
  62. "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  63. "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
  64. "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
  65. "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
  66. "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
  67. ]
  68. class CredentialsImpl(external_account.Credentials):
  69. def __init__(self, **kwargs):
  70. super(CredentialsImpl, self).__init__(**kwargs)
  71. self._counter = 0
  72. def retrieve_subject_token(self, request):
  73. counter = self._counter
  74. self._counter += 1
  75. return "subject_token_{}".format(counter)
  76. class TestCredentials(object):
  77. TOKEN_URL = "https://sts.googleapis.com/v1/token"
  78. TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
  79. PROJECT_NUMBER = "123456"
  80. POOL_ID = "POOL_ID"
  81. PROVIDER_ID = "PROVIDER_ID"
  82. AUDIENCE = (
  83. "//iam.googleapis.com/projects/{}"
  84. "/locations/global/workloadIdentityPools/{}"
  85. "/providers/{}"
  86. ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
  87. WORKFORCE_AUDIENCE = (
  88. "//iam.googleapis.com/locations/global/workforcePools/{}/providers/{}"
  89. ).format(POOL_ID, PROVIDER_ID)
  90. WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
  91. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  92. WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
  93. CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
  94. SUCCESS_RESPONSE = {
  95. "access_token": "ACCESS_TOKEN",
  96. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  97. "token_type": "Bearer",
  98. "expires_in": 3600,
  99. "scope": "scope1 scope2",
  100. }
  101. ERROR_RESPONSE = {
  102. "error": "invalid_request",
  103. "error_description": "Invalid subject token",
  104. "error_uri": "https://tools.ietf.org/html/rfc6749",
  105. }
  106. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  107. SERVICE_ACCOUNT_IMPERSONATION_URL = (
  108. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  109. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  110. )
  111. SCOPES = ["scope1", "scope2"]
  112. IMPERSONATION_ERROR_RESPONSE = {
  113. "error": {
  114. "code": 400,
  115. "message": "Request contains an invalid argument",
  116. "status": "INVALID_ARGUMENT",
  117. }
  118. }
  119. PROJECT_ID = "my-proj-id"
  120. CLOUD_RESOURCE_MANAGER_URL = (
  121. "https://cloudresourcemanager.googleapis.com/v1/projects/"
  122. )
  123. CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
  124. "projectNumber": PROJECT_NUMBER,
  125. "projectId": PROJECT_ID,
  126. "lifecycleState": "ACTIVE",
  127. "name": "project-name",
  128. "createTime": "2018-11-06T04:42:54.109Z",
  129. "parent": {"type": "folder", "id": "12345678901"},
  130. }
  131. @classmethod
  132. def make_credentials(
  133. cls,
  134. client_id=None,
  135. client_secret=None,
  136. quota_project_id=None,
  137. token_info_url=None,
  138. scopes=None,
  139. default_scopes=None,
  140. service_account_impersonation_url=None,
  141. service_account_impersonation_options={},
  142. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  143. ):
  144. return CredentialsImpl(
  145. audience=cls.AUDIENCE,
  146. subject_token_type=cls.SUBJECT_TOKEN_TYPE,
  147. token_url=cls.TOKEN_URL,
  148. token_info_url=token_info_url,
  149. service_account_impersonation_url=service_account_impersonation_url,
  150. service_account_impersonation_options=service_account_impersonation_options,
  151. credential_source=cls.CREDENTIAL_SOURCE,
  152. client_id=client_id,
  153. client_secret=client_secret,
  154. quota_project_id=quota_project_id,
  155. scopes=scopes,
  156. default_scopes=default_scopes,
  157. universe_domain=universe_domain,
  158. )
  159. @classmethod
  160. def make_workforce_pool_credentials(
  161. cls,
  162. client_id=None,
  163. client_secret=None,
  164. quota_project_id=None,
  165. scopes=None,
  166. default_scopes=None,
  167. service_account_impersonation_url=None,
  168. workforce_pool_user_project=None,
  169. ):
  170. return CredentialsImpl(
  171. audience=cls.WORKFORCE_AUDIENCE,
  172. subject_token_type=cls.WORKFORCE_SUBJECT_TOKEN_TYPE,
  173. token_url=cls.TOKEN_URL,
  174. service_account_impersonation_url=service_account_impersonation_url,
  175. credential_source=cls.CREDENTIAL_SOURCE,
  176. client_id=client_id,
  177. client_secret=client_secret,
  178. quota_project_id=quota_project_id,
  179. scopes=scopes,
  180. default_scopes=default_scopes,
  181. workforce_pool_user_project=workforce_pool_user_project,
  182. )
  183. @classmethod
  184. def make_mock_request(
  185. cls,
  186. status=http_client.OK,
  187. data=None,
  188. impersonation_status=None,
  189. impersonation_data=None,
  190. cloud_resource_manager_status=None,
  191. cloud_resource_manager_data=None,
  192. ):
  193. # STS token exchange request.
  194. token_response = mock.create_autospec(transport.Response, instance=True)
  195. token_response.status = status
  196. token_response.data = json.dumps(data).encode("utf-8")
  197. responses = [token_response]
  198. # If service account impersonation is requested, mock the expected response.
  199. if impersonation_status:
  200. impersonation_response = mock.create_autospec(
  201. transport.Response, instance=True
  202. )
  203. impersonation_response.status = impersonation_status
  204. impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
  205. responses.append(impersonation_response)
  206. # If cloud resource manager is requested, mock the expected response.
  207. if cloud_resource_manager_status:
  208. cloud_resource_manager_response = mock.create_autospec(
  209. transport.Response, instance=True
  210. )
  211. cloud_resource_manager_response.status = cloud_resource_manager_status
  212. cloud_resource_manager_response.data = json.dumps(
  213. cloud_resource_manager_data
  214. ).encode("utf-8")
  215. responses.append(cloud_resource_manager_response)
  216. request = mock.create_autospec(transport.Request)
  217. request.side_effect = responses
  218. return request
  219. @classmethod
  220. def assert_token_request_kwargs(
  221. cls, request_kwargs, headers, request_data, cert=None
  222. ):
  223. assert request_kwargs["url"] == cls.TOKEN_URL
  224. assert request_kwargs["method"] == "POST"
  225. assert request_kwargs["headers"] == headers
  226. if cert is not None:
  227. assert request_kwargs["cert"] == cert
  228. else:
  229. assert "cert" not in request_kwargs
  230. assert request_kwargs["body"] is not None
  231. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  232. for (k, v) in body_tuples:
  233. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  234. assert len(body_tuples) == len(request_data.keys())
  235. @classmethod
  236. def assert_impersonation_request_kwargs(
  237. cls, request_kwargs, headers, request_data, cert=None
  238. ):
  239. assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
  240. assert request_kwargs["method"] == "POST"
  241. assert request_kwargs["headers"] == headers
  242. if cert is not None:
  243. assert request_kwargs["cert"] == cert
  244. else:
  245. assert "cert" not in request_kwargs
  246. assert request_kwargs["body"] is not None
  247. body_json = json.loads(request_kwargs["body"].decode("utf-8"))
  248. assert body_json == request_data
  249. @classmethod
  250. def assert_resource_manager_request_kwargs(
  251. cls, request_kwargs, project_number, headers
  252. ):
  253. assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
  254. assert request_kwargs["method"] == "GET"
  255. assert request_kwargs["headers"] == headers
  256. assert "body" not in request_kwargs
  257. def test_get_cred_info(self):
  258. credentials = self.make_credentials()
  259. assert not credentials.get_cred_info()
  260. credentials._cred_file_path = "/path/to/file"
  261. assert credentials.get_cred_info() == {
  262. "credential_source": "/path/to/file",
  263. "credential_type": "external account credentials",
  264. }
  265. credentials._service_account_impersonation_url = (
  266. self.SERVICE_ACCOUNT_IMPERSONATION_URL
  267. )
  268. assert credentials.get_cred_info() == {
  269. "credential_source": "/path/to/file",
  270. "credential_type": "external account credentials",
  271. "principal": SERVICE_ACCOUNT_EMAIL,
  272. }
  273. def test__make_copy_get_cred_info(self):
  274. credentials = self.make_credentials()
  275. credentials._cred_file_path = "/path/to/file"
  276. cred_copy = credentials._make_copy()
  277. assert cred_copy._cred_file_path == "/path/to/file"
  278. def test_default_state(self):
  279. credentials = self.make_credentials(
  280. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  281. )
  282. # Token url and service account impersonation url should be set
  283. assert credentials._token_url
  284. assert credentials._service_account_impersonation_url
  285. # Not token acquired yet
  286. assert not credentials.token
  287. assert not credentials.valid
  288. # Expiration hasn't been set yet
  289. assert not credentials.expiry
  290. assert not credentials.expired
  291. # Scopes are required
  292. assert not credentials.scopes
  293. assert credentials.requires_scopes
  294. assert not credentials.quota_project_id
  295. # Token info url not set yet
  296. assert not credentials.token_info_url
  297. def test_nonworkforce_with_workforce_pool_user_project(self):
  298. with pytest.raises(ValueError) as excinfo:
  299. CredentialsImpl(
  300. audience=self.AUDIENCE,
  301. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  302. token_url=self.TOKEN_URL,
  303. credential_source=self.CREDENTIAL_SOURCE,
  304. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  305. )
  306. assert excinfo.match(
  307. "workforce_pool_user_project should not be set for non-workforce "
  308. "pool credentials"
  309. )
  310. def test_with_scopes(self):
  311. credentials = self.make_credentials()
  312. assert not credentials.scopes
  313. assert credentials.requires_scopes
  314. scoped_credentials = credentials.with_scopes(["email"])
  315. assert scoped_credentials.has_scopes(["email"])
  316. assert not scoped_credentials.requires_scopes
  317. def test_with_scopes_workforce_pool(self):
  318. credentials = self.make_workforce_pool_credentials(
  319. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  320. )
  321. assert not credentials.scopes
  322. assert credentials.requires_scopes
  323. scoped_credentials = credentials.with_scopes(["email"])
  324. assert scoped_credentials.has_scopes(["email"])
  325. assert not scoped_credentials.requires_scopes
  326. assert (
  327. scoped_credentials.info.get("workforce_pool_user_project")
  328. == self.WORKFORCE_POOL_USER_PROJECT
  329. )
  330. def test_with_scopes_using_user_and_default_scopes(self):
  331. credentials = self.make_credentials()
  332. assert not credentials.scopes
  333. assert credentials.requires_scopes
  334. scoped_credentials = credentials.with_scopes(
  335. ["email"], default_scopes=["profile"]
  336. )
  337. assert scoped_credentials.has_scopes(["email"])
  338. assert not scoped_credentials.has_scopes(["profile"])
  339. assert not scoped_credentials.requires_scopes
  340. assert scoped_credentials.scopes == ["email"]
  341. assert scoped_credentials.default_scopes == ["profile"]
  342. def test_with_scopes_using_default_scopes_only(self):
  343. credentials = self.make_credentials()
  344. assert not credentials.scopes
  345. assert credentials.requires_scopes
  346. scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
  347. assert scoped_credentials.has_scopes(["profile"])
  348. assert not scoped_credentials.requires_scopes
  349. def test_with_scopes_full_options_propagated(self):
  350. credentials = self.make_credentials(
  351. client_id=CLIENT_ID,
  352. client_secret=CLIENT_SECRET,
  353. quota_project_id=self.QUOTA_PROJECT_ID,
  354. scopes=self.SCOPES,
  355. token_info_url=self.TOKEN_INFO_URL,
  356. default_scopes=["default1"],
  357. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  358. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  359. )
  360. with mock.patch.object(
  361. external_account.Credentials, "__init__", return_value=None
  362. ) as mock_init:
  363. credentials.with_scopes(["email"], ["default2"])
  364. # Confirm with_scopes initialized the credential with the expected
  365. # parameters and scopes.
  366. mock_init.assert_called_once_with(
  367. audience=self.AUDIENCE,
  368. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  369. token_url=self.TOKEN_URL,
  370. token_info_url=self.TOKEN_INFO_URL,
  371. credential_source=self.CREDENTIAL_SOURCE,
  372. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  373. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  374. client_id=CLIENT_ID,
  375. client_secret=CLIENT_SECRET,
  376. quota_project_id=self.QUOTA_PROJECT_ID,
  377. scopes=["email"],
  378. default_scopes=["default2"],
  379. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  380. )
  381. def test_with_token_uri(self):
  382. credentials = self.make_credentials()
  383. new_token_uri = "https://eu-sts.googleapis.com/v1/token"
  384. assert credentials._token_url == self.TOKEN_URL
  385. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  386. assert creds_with_new_token_uri._token_url == new_token_uri
  387. def test_with_token_uri_workforce_pool(self):
  388. credentials = self.make_workforce_pool_credentials(
  389. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  390. )
  391. new_token_uri = "https://eu-sts.googleapis.com/v1/token"
  392. assert credentials._token_url == self.TOKEN_URL
  393. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  394. assert creds_with_new_token_uri._token_url == new_token_uri
  395. assert (
  396. creds_with_new_token_uri.info.get("workforce_pool_user_project")
  397. == self.WORKFORCE_POOL_USER_PROJECT
  398. )
  399. def test_with_quota_project(self):
  400. credentials = self.make_credentials()
  401. assert not credentials.scopes
  402. assert not credentials.quota_project_id
  403. quota_project_creds = credentials.with_quota_project("project-foo")
  404. assert quota_project_creds.quota_project_id == "project-foo"
  405. def test_with_quota_project_workforce_pool(self):
  406. credentials = self.make_workforce_pool_credentials(
  407. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  408. )
  409. assert not credentials.scopes
  410. assert not credentials.quota_project_id
  411. quota_project_creds = credentials.with_quota_project("project-foo")
  412. assert quota_project_creds.quota_project_id == "project-foo"
  413. assert (
  414. quota_project_creds.info.get("workforce_pool_user_project")
  415. == self.WORKFORCE_POOL_USER_PROJECT
  416. )
  417. def test_with_quota_project_full_options_propagated(self):
  418. credentials = self.make_credentials(
  419. client_id=CLIENT_ID,
  420. client_secret=CLIENT_SECRET,
  421. token_info_url=self.TOKEN_INFO_URL,
  422. quota_project_id=self.QUOTA_PROJECT_ID,
  423. scopes=self.SCOPES,
  424. default_scopes=["default1"],
  425. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  426. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  427. )
  428. with mock.patch.object(
  429. external_account.Credentials, "__init__", return_value=None
  430. ) as mock_init:
  431. new_cred = credentials.with_quota_project("project-foo")
  432. # Confirm with_quota_project initialized the credential with the
  433. # expected parameters.
  434. mock_init.assert_called_once_with(
  435. audience=self.AUDIENCE,
  436. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  437. token_url=self.TOKEN_URL,
  438. token_info_url=self.TOKEN_INFO_URL,
  439. credential_source=self.CREDENTIAL_SOURCE,
  440. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  441. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  442. client_id=CLIENT_ID,
  443. client_secret=CLIENT_SECRET,
  444. quota_project_id=self.QUOTA_PROJECT_ID,
  445. scopes=self.SCOPES,
  446. default_scopes=["default1"],
  447. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  448. )
  449. # Confirm with_quota_project sets the correct quota project after
  450. # initialization.
  451. assert new_cred.quota_project_id == "project-foo"
  452. def test_info(self):
  453. credentials = self.make_credentials(universe_domain="dummy_universe.com")
  454. assert credentials.info == {
  455. "type": "external_account",
  456. "audience": self.AUDIENCE,
  457. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  458. "token_url": self.TOKEN_URL,
  459. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  460. "universe_domain": "dummy_universe.com",
  461. }
  462. def test_universe_domain(self):
  463. credentials = self.make_credentials(universe_domain="dummy_universe.com")
  464. assert credentials.universe_domain == "dummy_universe.com"
  465. credentials = self.make_credentials()
  466. assert credentials.universe_domain == DEFAULT_UNIVERSE_DOMAIN
  467. def test_with_universe_domain(self):
  468. credentials = self.make_credentials()
  469. new_credentials = credentials.with_universe_domain("dummy_universe.com")
  470. assert new_credentials.universe_domain == "dummy_universe.com"
  471. def test_info_workforce_pool(self):
  472. credentials = self.make_workforce_pool_credentials(
  473. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  474. )
  475. assert credentials.info == {
  476. "type": "external_account",
  477. "audience": self.WORKFORCE_AUDIENCE,
  478. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  479. "token_url": self.TOKEN_URL,
  480. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  481. "workforce_pool_user_project": self.WORKFORCE_POOL_USER_PROJECT,
  482. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  483. }
  484. def test_info_with_full_options(self):
  485. credentials = self.make_credentials(
  486. client_id=CLIENT_ID,
  487. client_secret=CLIENT_SECRET,
  488. quota_project_id=self.QUOTA_PROJECT_ID,
  489. token_info_url=self.TOKEN_INFO_URL,
  490. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  491. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  492. )
  493. assert credentials.info == {
  494. "type": "external_account",
  495. "audience": self.AUDIENCE,
  496. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  497. "token_url": self.TOKEN_URL,
  498. "token_info_url": self.TOKEN_INFO_URL,
  499. "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  500. "service_account_impersonation": {"token_lifetime_seconds": 2800},
  501. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  502. "quota_project_id": self.QUOTA_PROJECT_ID,
  503. "client_id": CLIENT_ID,
  504. "client_secret": CLIENT_SECRET,
  505. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  506. }
  507. def test_service_account_email_without_impersonation(self):
  508. credentials = self.make_credentials()
  509. assert credentials.service_account_email is None
  510. def test_service_account_email_with_impersonation(self):
  511. credentials = self.make_credentials(
  512. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  513. )
  514. assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
  515. @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
  516. def test_is_user_with_non_users(self, audience):
  517. credentials = CredentialsImpl(
  518. audience=audience,
  519. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  520. token_url=self.TOKEN_URL,
  521. credential_source=self.CREDENTIAL_SOURCE,
  522. )
  523. assert credentials.is_user is False
  524. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  525. def test_is_user_with_users(self, audience):
  526. credentials = CredentialsImpl(
  527. audience=audience,
  528. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  529. token_url=self.TOKEN_URL,
  530. credential_source=self.CREDENTIAL_SOURCE,
  531. )
  532. assert credentials.is_user is True
  533. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  534. def test_is_user_with_users_and_impersonation(self, audience):
  535. # Initialize the credentials with service account impersonation.
  536. credentials = CredentialsImpl(
  537. audience=audience,
  538. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  539. token_url=self.TOKEN_URL,
  540. credential_source=self.CREDENTIAL_SOURCE,
  541. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  542. )
  543. # Even though the audience is for a workforce pool, since service account
  544. # impersonation is used, the credentials will represent a service account and
  545. # not a user.
  546. assert credentials.is_user is False
  547. @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
  548. def test_is_workforce_pool_with_non_users(self, audience):
  549. credentials = CredentialsImpl(
  550. audience=audience,
  551. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  552. token_url=self.TOKEN_URL,
  553. credential_source=self.CREDENTIAL_SOURCE,
  554. )
  555. assert credentials.is_workforce_pool is False
  556. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  557. def test_is_workforce_pool_with_users(self, audience):
  558. credentials = CredentialsImpl(
  559. audience=audience,
  560. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  561. token_url=self.TOKEN_URL,
  562. credential_source=self.CREDENTIAL_SOURCE,
  563. )
  564. assert credentials.is_workforce_pool is True
  565. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  566. def test_is_workforce_pool_with_users_and_impersonation(self, audience):
  567. # Initialize the credentials with workforce audience and service account
  568. # impersonation.
  569. credentials = CredentialsImpl(
  570. audience=audience,
  571. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  572. token_url=self.TOKEN_URL,
  573. credential_source=self.CREDENTIAL_SOURCE,
  574. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  575. )
  576. # Even though impersonation is used, is_workforce_pool should still return True.
  577. assert credentials.is_workforce_pool is True
  578. @pytest.mark.parametrize("mock_expires_in", [2800, "2800"])
  579. @mock.patch(
  580. "google.auth.metrics.python_and_auth_lib_version",
  581. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  582. )
  583. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  584. def test_refresh_without_client_auth_success(
  585. self, unused_utcnow, mock_auth_lib_value, mock_expires_in
  586. ):
  587. response = self.SUCCESS_RESPONSE.copy()
  588. # Test custom expiration to confirm expiry is set correctly.
  589. response["expires_in"] = mock_expires_in
  590. expected_expiry = datetime.datetime.min + datetime.timedelta(
  591. seconds=int(mock_expires_in)
  592. )
  593. headers = {
  594. "Content-Type": "application/x-www-form-urlencoded",
  595. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  596. }
  597. request_data = {
  598. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  599. "audience": self.AUDIENCE,
  600. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  601. "subject_token": "subject_token_0",
  602. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  603. }
  604. request = self.make_mock_request(status=http_client.OK, data=response)
  605. credentials = self.make_credentials()
  606. credentials.refresh(request)
  607. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  608. assert credentials.valid
  609. assert credentials.expiry == expected_expiry
  610. assert not credentials.expired
  611. assert credentials.token == response["access_token"]
  612. @mock.patch(
  613. "google.auth.metrics.python_and_auth_lib_version",
  614. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  615. )
  616. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  617. @mock.patch(
  618. "google.auth.external_account.Credentials._mtls_required", return_value=True
  619. )
  620. @mock.patch(
  621. "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths",
  622. return_value=("path/to/cert.pem", "path/to/key.pem"),
  623. )
  624. def test_refresh_with_mtls(
  625. self,
  626. mock_get_mtls_cert_and_key_paths,
  627. mock_mtls_required,
  628. unused_utcnow,
  629. mock_auth_lib_value,
  630. ):
  631. response = self.SUCCESS_RESPONSE.copy()
  632. # Test custom expiration to confirm expiry is set correctly.
  633. response["expires_in"] = 2800
  634. expected_expiry = datetime.datetime.min + datetime.timedelta(
  635. seconds=response["expires_in"]
  636. )
  637. headers = {
  638. "Content-Type": "application/x-www-form-urlencoded",
  639. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  640. }
  641. request_data = {
  642. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  643. "audience": self.AUDIENCE,
  644. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  645. "subject_token": "subject_token_0",
  646. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  647. }
  648. request = self.make_mock_request(status=http_client.OK, data=response)
  649. credentials = self.make_credentials()
  650. credentials.refresh(request)
  651. expected_cert_path = ("path/to/cert.pem", "path/to/key.pem")
  652. self.assert_token_request_kwargs(
  653. request.call_args[1], headers, request_data, expected_cert_path
  654. )
  655. assert credentials.valid
  656. assert credentials.expiry == expected_expiry
  657. assert not credentials.expired
  658. assert credentials.token == response["access_token"]
  659. @mock.patch(
  660. "google.auth.metrics.python_and_auth_lib_version",
  661. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  662. )
  663. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  664. def test_refresh_workforce_without_client_auth_success(
  665. self, unused_utcnow, test_auth_lib_value
  666. ):
  667. response = self.SUCCESS_RESPONSE.copy()
  668. # Test custom expiration to confirm expiry is set correctly.
  669. response["expires_in"] = 2800
  670. expected_expiry = datetime.datetime.min + datetime.timedelta(
  671. seconds=response["expires_in"]
  672. )
  673. headers = {
  674. "Content-Type": "application/x-www-form-urlencoded",
  675. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  676. }
  677. request_data = {
  678. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  679. "audience": self.WORKFORCE_AUDIENCE,
  680. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  681. "subject_token": "subject_token_0",
  682. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  683. "options": urllib.parse.quote(
  684. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  685. ),
  686. }
  687. request = self.make_mock_request(status=http_client.OK, data=response)
  688. credentials = self.make_workforce_pool_credentials(
  689. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  690. )
  691. credentials.refresh(request)
  692. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  693. assert credentials.valid
  694. assert credentials.expiry == expected_expiry
  695. assert not credentials.expired
  696. assert credentials.token == response["access_token"]
  697. @mock.patch(
  698. "google.auth.metrics.python_and_auth_lib_version",
  699. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  700. )
  701. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  702. def test_refresh_workforce_with_client_auth_success(
  703. self, unused_utcnow, mock_auth_lib_value
  704. ):
  705. response = self.SUCCESS_RESPONSE.copy()
  706. # Test custom expiration to confirm expiry is set correctly.
  707. response["expires_in"] = 2800
  708. expected_expiry = datetime.datetime.min + datetime.timedelta(
  709. seconds=response["expires_in"]
  710. )
  711. headers = {
  712. "Content-Type": "application/x-www-form-urlencoded",
  713. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  714. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  715. }
  716. request_data = {
  717. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  718. "audience": self.WORKFORCE_AUDIENCE,
  719. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  720. "subject_token": "subject_token_0",
  721. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  722. }
  723. request = self.make_mock_request(status=http_client.OK, data=response)
  724. # Client Auth will have higher priority over workforce_pool_user_project.
  725. credentials = self.make_workforce_pool_credentials(
  726. client_id=CLIENT_ID,
  727. client_secret=CLIENT_SECRET,
  728. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  729. )
  730. credentials.refresh(request)
  731. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  732. assert credentials.valid
  733. assert credentials.expiry == expected_expiry
  734. assert not credentials.expired
  735. assert credentials.token == response["access_token"]
  736. @mock.patch(
  737. "google.auth.metrics.python_and_auth_lib_version",
  738. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  739. )
  740. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  741. def test_refresh_workforce_with_client_auth_and_no_workforce_project_success(
  742. self, unused_utcnow, mock_lib_version_value
  743. ):
  744. response = self.SUCCESS_RESPONSE.copy()
  745. # Test custom expiration to confirm expiry is set correctly.
  746. response["expires_in"] = 2800
  747. expected_expiry = datetime.datetime.min + datetime.timedelta(
  748. seconds=response["expires_in"]
  749. )
  750. headers = {
  751. "Content-Type": "application/x-www-form-urlencoded",
  752. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  753. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  754. }
  755. request_data = {
  756. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  757. "audience": self.WORKFORCE_AUDIENCE,
  758. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  759. "subject_token": "subject_token_0",
  760. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  761. }
  762. request = self.make_mock_request(status=http_client.OK, data=response)
  763. # Client Auth will be sufficient for user project determination.
  764. credentials = self.make_workforce_pool_credentials(
  765. client_id=CLIENT_ID,
  766. client_secret=CLIENT_SECRET,
  767. workforce_pool_user_project=None,
  768. )
  769. credentials.refresh(request)
  770. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  771. assert credentials.valid
  772. assert credentials.expiry == expected_expiry
  773. assert not credentials.expired
  774. assert credentials.token == response["access_token"]
  775. @mock.patch(
  776. "google.auth.metrics.token_request_access_token_impersonate",
  777. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  778. )
  779. @mock.patch(
  780. "google.auth.metrics.python_and_auth_lib_version",
  781. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  782. )
  783. def test_refresh_impersonation_without_client_auth_success(
  784. self, mock_metrics_header_value, mock_auth_lib_value
  785. ):
  786. # Simulate service account access token expires in 2800 seconds.
  787. expire_time = (
  788. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  789. ).isoformat("T") + "Z"
  790. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  791. # STS token exchange request/response.
  792. token_response = self.SUCCESS_RESPONSE.copy()
  793. token_headers = {
  794. "Content-Type": "application/x-www-form-urlencoded",
  795. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  796. }
  797. token_request_data = {
  798. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  799. "audience": self.AUDIENCE,
  800. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  801. "subject_token": "subject_token_0",
  802. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  803. "scope": "https://www.googleapis.com/auth/iam",
  804. }
  805. # Service account impersonation request/response.
  806. impersonation_response = {
  807. "accessToken": "SA_ACCESS_TOKEN",
  808. "expireTime": expire_time,
  809. }
  810. impersonation_headers = {
  811. "Content-Type": "application/json",
  812. "authorization": "Bearer {}".format(token_response["access_token"]),
  813. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  814. "x-allowed-locations": "0x0",
  815. }
  816. impersonation_request_data = {
  817. "delegates": None,
  818. "scope": self.SCOPES,
  819. "lifetime": "3600s",
  820. }
  821. # Initialize mock request to handle token exchange and service account
  822. # impersonation request.
  823. request = self.make_mock_request(
  824. status=http_client.OK,
  825. data=token_response,
  826. impersonation_status=http_client.OK,
  827. impersonation_data=impersonation_response,
  828. )
  829. # Initialize credentials with service account impersonation.
  830. credentials = self.make_credentials(
  831. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  832. scopes=self.SCOPES,
  833. )
  834. credentials.refresh(request)
  835. # Only 2 requests should be processed.
  836. assert len(request.call_args_list) == 2
  837. # Verify token exchange request parameters.
  838. self.assert_token_request_kwargs(
  839. request.call_args_list[0][1], token_headers, token_request_data
  840. )
  841. # Verify service account impersonation request parameters.
  842. self.assert_impersonation_request_kwargs(
  843. request.call_args_list[1][1],
  844. impersonation_headers,
  845. impersonation_request_data,
  846. )
  847. assert credentials.valid
  848. assert credentials.expiry == expected_expiry
  849. assert not credentials.expired
  850. assert credentials.token == impersonation_response["accessToken"]
  851. @mock.patch(
  852. "google.auth.metrics.token_request_access_token_impersonate",
  853. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  854. )
  855. @mock.patch(
  856. "google.auth.metrics.python_and_auth_lib_version",
  857. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  858. )
  859. @mock.patch(
  860. "google.auth.external_account.Credentials._mtls_required", return_value=True
  861. )
  862. @mock.patch(
  863. "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths",
  864. return_value=("path/to/cert.pem", "path/to/key.pem"),
  865. )
  866. def test_refresh_impersonation_with_mtls_success(
  867. self,
  868. mock_get_mtls_cert_and_key_paths,
  869. mock_mtls_required,
  870. mock_metrics_header_value,
  871. mock_auth_lib_value,
  872. ):
  873. # Simulate service account access token expires in 2800 seconds.
  874. expire_time = (
  875. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  876. ).isoformat("T") + "Z"
  877. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  878. # STS token exchange request/response.
  879. token_response = self.SUCCESS_RESPONSE.copy()
  880. token_headers = {
  881. "Content-Type": "application/x-www-form-urlencoded",
  882. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  883. }
  884. token_request_data = {
  885. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  886. "audience": self.AUDIENCE,
  887. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  888. "subject_token": "subject_token_0",
  889. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  890. "scope": "https://www.googleapis.com/auth/iam",
  891. }
  892. # Service account impersonation request/response.
  893. impersonation_response = {
  894. "accessToken": "SA_ACCESS_TOKEN",
  895. "expireTime": expire_time,
  896. }
  897. impersonation_headers = {
  898. "Content-Type": "application/json",
  899. "authorization": "Bearer {}".format(token_response["access_token"]),
  900. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  901. "x-allowed-locations": "0x0",
  902. }
  903. impersonation_request_data = {
  904. "delegates": None,
  905. "scope": self.SCOPES,
  906. "lifetime": "3600s",
  907. }
  908. # Initialize mock request to handle token exchange and service account
  909. # impersonation request.
  910. request = self.make_mock_request(
  911. status=http_client.OK,
  912. data=token_response,
  913. impersonation_status=http_client.OK,
  914. impersonation_data=impersonation_response,
  915. )
  916. # Initialize credentials with service account impersonation.
  917. credentials = self.make_credentials(
  918. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  919. scopes=self.SCOPES,
  920. )
  921. credentials.refresh(request)
  922. # Only 2 requests should be processed.
  923. assert len(request.call_args_list) == 2
  924. # Verify token exchange request parameters.
  925. expected_cert_paths = ("path/to/cert.pem", "path/to/key.pem")
  926. self.assert_token_request_kwargs(
  927. request.call_args_list[0][1],
  928. token_headers,
  929. token_request_data,
  930. expected_cert_paths,
  931. )
  932. # Verify service account impersonation request parameters.
  933. self.assert_impersonation_request_kwargs(
  934. request.call_args_list[1][1],
  935. impersonation_headers,
  936. impersonation_request_data,
  937. expected_cert_paths,
  938. )
  939. assert credentials.valid
  940. assert credentials.expiry == expected_expiry
  941. assert not credentials.expired
  942. assert credentials.token == impersonation_response["accessToken"]
  943. @mock.patch(
  944. "google.auth.metrics.token_request_access_token_impersonate",
  945. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  946. )
  947. @mock.patch(
  948. "google.auth.metrics.python_and_auth_lib_version",
  949. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  950. )
  951. def test_refresh_workforce_impersonation_without_client_auth_success(
  952. self, mock_metrics_header_value, mock_auth_lib_value
  953. ):
  954. # Simulate service account access token expires in 2800 seconds.
  955. expire_time = (
  956. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  957. ).isoformat("T") + "Z"
  958. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  959. # STS token exchange request/response.
  960. token_response = self.SUCCESS_RESPONSE.copy()
  961. token_headers = {
  962. "Content-Type": "application/x-www-form-urlencoded",
  963. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  964. }
  965. token_request_data = {
  966. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  967. "audience": self.WORKFORCE_AUDIENCE,
  968. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  969. "subject_token": "subject_token_0",
  970. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  971. "scope": "https://www.googleapis.com/auth/iam",
  972. "options": urllib.parse.quote(
  973. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  974. ),
  975. }
  976. # Service account impersonation request/response.
  977. impersonation_response = {
  978. "accessToken": "SA_ACCESS_TOKEN",
  979. "expireTime": expire_time,
  980. }
  981. impersonation_headers = {
  982. "Content-Type": "application/json",
  983. "authorization": "Bearer {}".format(token_response["access_token"]),
  984. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  985. "x-allowed-locations": "0x0",
  986. }
  987. impersonation_request_data = {
  988. "delegates": None,
  989. "scope": self.SCOPES,
  990. "lifetime": "3600s",
  991. }
  992. # Initialize mock request to handle token exchange and service account
  993. # impersonation request.
  994. request = self.make_mock_request(
  995. status=http_client.OK,
  996. data=token_response,
  997. impersonation_status=http_client.OK,
  998. impersonation_data=impersonation_response,
  999. )
  1000. # Initialize credentials with service account impersonation.
  1001. credentials = self.make_workforce_pool_credentials(
  1002. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1003. scopes=self.SCOPES,
  1004. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1005. )
  1006. credentials.refresh(request)
  1007. # Only 2 requests should be processed.
  1008. assert len(request.call_args_list) == 2
  1009. # Verify token exchange request parameters.
  1010. self.assert_token_request_kwargs(
  1011. request.call_args_list[0][1], token_headers, token_request_data
  1012. )
  1013. # Verify service account impersonation request parameters.
  1014. self.assert_impersonation_request_kwargs(
  1015. request.call_args_list[1][1],
  1016. impersonation_headers,
  1017. impersonation_request_data,
  1018. )
  1019. assert credentials.valid
  1020. assert credentials.expiry == expected_expiry
  1021. assert not credentials.expired
  1022. assert credentials.token == impersonation_response["accessToken"]
  1023. @mock.patch(
  1024. "google.auth.metrics.python_and_auth_lib_version",
  1025. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1026. )
  1027. def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
  1028. self, mock_auth_lib_value
  1029. ):
  1030. headers = {
  1031. "Content-Type": "application/x-www-form-urlencoded",
  1032. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1033. }
  1034. request_data = {
  1035. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1036. "audience": self.AUDIENCE,
  1037. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1038. "scope": "scope1 scope2",
  1039. "subject_token": "subject_token_0",
  1040. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1041. }
  1042. request = self.make_mock_request(
  1043. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1044. )
  1045. credentials = self.make_credentials(
  1046. scopes=["scope1", "scope2"],
  1047. # Default scopes will be ignored in favor of user scopes.
  1048. default_scopes=["ignored"],
  1049. )
  1050. credentials.refresh(request)
  1051. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  1052. assert credentials.valid
  1053. assert not credentials.expired
  1054. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1055. assert credentials.has_scopes(["scope1", "scope2"])
  1056. assert not credentials.has_scopes(["ignored"])
  1057. @mock.patch(
  1058. "google.auth.metrics.python_and_auth_lib_version",
  1059. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1060. )
  1061. def test_refresh_without_client_auth_success_explicit_default_scopes_only(
  1062. self, mock_auth_lib_value
  1063. ):
  1064. headers = {
  1065. "Content-Type": "application/x-www-form-urlencoded",
  1066. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1067. }
  1068. request_data = {
  1069. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1070. "audience": self.AUDIENCE,
  1071. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1072. "scope": "scope1 scope2",
  1073. "subject_token": "subject_token_0",
  1074. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1075. }
  1076. request = self.make_mock_request(
  1077. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1078. )
  1079. credentials = self.make_credentials(
  1080. scopes=None,
  1081. # Default scopes will be used since user scopes are none.
  1082. default_scopes=["scope1", "scope2"],
  1083. )
  1084. credentials.refresh(request)
  1085. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  1086. assert credentials.valid
  1087. assert not credentials.expired
  1088. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1089. assert credentials.has_scopes(["scope1", "scope2"])
  1090. def test_refresh_without_client_auth_error(self):
  1091. request = self.make_mock_request(
  1092. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  1093. )
  1094. credentials = self.make_credentials()
  1095. with pytest.raises(exceptions.OAuthError) as excinfo:
  1096. credentials.refresh(request)
  1097. assert excinfo.match(
  1098. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  1099. )
  1100. assert not credentials.expired
  1101. assert credentials.token is None
  1102. def test_refresh_impersonation_without_client_auth_error(self):
  1103. request = self.make_mock_request(
  1104. status=http_client.OK,
  1105. data=self.SUCCESS_RESPONSE,
  1106. impersonation_status=http_client.BAD_REQUEST,
  1107. impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
  1108. )
  1109. credentials = self.make_credentials(
  1110. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1111. scopes=self.SCOPES,
  1112. )
  1113. with pytest.raises(exceptions.RefreshError) as excinfo:
  1114. credentials.refresh(request)
  1115. assert excinfo.match(r"Unable to acquire impersonated credentials")
  1116. assert not credentials.expired
  1117. assert credentials.token is None
  1118. def test_refresh_impersonation_invalid_impersonated_url_error(self):
  1119. credentials = self.make_credentials(
  1120. service_account_impersonation_url="https://iamcredentials.googleapis.com/v1/invalid",
  1121. scopes=self.SCOPES,
  1122. )
  1123. with pytest.raises(exceptions.RefreshError) as excinfo:
  1124. credentials.refresh(None)
  1125. assert excinfo.match(
  1126. r"Unable to determine target principal from service account impersonation URL."
  1127. )
  1128. assert not credentials.expired
  1129. assert credentials.token is None
  1130. @mock.patch(
  1131. "google.auth.metrics.python_and_auth_lib_version",
  1132. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1133. )
  1134. def test_refresh_with_client_auth_success(self, mock_auth_lib_value):
  1135. headers = {
  1136. "Content-Type": "application/x-www-form-urlencoded",
  1137. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1138. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1139. }
  1140. request_data = {
  1141. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1142. "audience": self.AUDIENCE,
  1143. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1144. "subject_token": "subject_token_0",
  1145. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1146. }
  1147. request = self.make_mock_request(
  1148. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1149. )
  1150. credentials = self.make_credentials(
  1151. client_id=CLIENT_ID, client_secret=CLIENT_SECRET
  1152. )
  1153. credentials.refresh(request)
  1154. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  1155. assert credentials.valid
  1156. assert not credentials.expired
  1157. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1158. @mock.patch(
  1159. "google.auth.metrics.token_request_access_token_impersonate",
  1160. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1161. )
  1162. @mock.patch(
  1163. "google.auth.metrics.python_and_auth_lib_version",
  1164. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1165. )
  1166. def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(
  1167. self, mock_metrics_header_value, mock_auth_lib_value
  1168. ):
  1169. # Simulate service account access token expires in 2800 seconds.
  1170. expire_time = (
  1171. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1172. ).isoformat("T") + "Z"
  1173. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1174. # STS token exchange request/response.
  1175. token_response = self.SUCCESS_RESPONSE.copy()
  1176. token_headers = {
  1177. "Content-Type": "application/x-www-form-urlencoded",
  1178. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1179. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1180. }
  1181. token_request_data = {
  1182. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1183. "audience": self.AUDIENCE,
  1184. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1185. "subject_token": "subject_token_0",
  1186. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1187. "scope": "https://www.googleapis.com/auth/iam",
  1188. }
  1189. # Service account impersonation request/response.
  1190. impersonation_response = {
  1191. "accessToken": "SA_ACCESS_TOKEN",
  1192. "expireTime": expire_time,
  1193. }
  1194. impersonation_headers = {
  1195. "Content-Type": "application/json",
  1196. "authorization": "Bearer {}".format(token_response["access_token"]),
  1197. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1198. "x-allowed-locations": "0x0",
  1199. }
  1200. impersonation_request_data = {
  1201. "delegates": None,
  1202. "scope": self.SCOPES,
  1203. "lifetime": "3600s",
  1204. }
  1205. # Initialize mock request to handle token exchange and service account
  1206. # impersonation request.
  1207. request = self.make_mock_request(
  1208. status=http_client.OK,
  1209. data=token_response,
  1210. impersonation_status=http_client.OK,
  1211. impersonation_data=impersonation_response,
  1212. )
  1213. # Initialize credentials with service account impersonation and basic auth.
  1214. credentials = self.make_credentials(
  1215. client_id=CLIENT_ID,
  1216. client_secret=CLIENT_SECRET,
  1217. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1218. scopes=self.SCOPES,
  1219. # Default scopes will be ignored since user scopes are specified.
  1220. default_scopes=["ignored"],
  1221. )
  1222. credentials.refresh(request)
  1223. # Only 2 requests should be processed.
  1224. assert len(request.call_args_list) == 2
  1225. # Verify token exchange request parameters.
  1226. self.assert_token_request_kwargs(
  1227. request.call_args_list[0][1], token_headers, token_request_data
  1228. )
  1229. # Verify service account impersonation request parameters.
  1230. self.assert_impersonation_request_kwargs(
  1231. request.call_args_list[1][1],
  1232. impersonation_headers,
  1233. impersonation_request_data,
  1234. )
  1235. assert credentials.valid
  1236. assert credentials.expiry == expected_expiry
  1237. assert not credentials.expired
  1238. assert credentials.token == impersonation_response["accessToken"]
  1239. @mock.patch(
  1240. "google.auth.metrics.token_request_access_token_impersonate",
  1241. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1242. )
  1243. @mock.patch(
  1244. "google.auth.metrics.python_and_auth_lib_version",
  1245. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1246. )
  1247. def test_refresh_impersonation_with_client_auth_success_use_default_scopes(
  1248. self, mock_metrics_header_value, mock_auth_lib_value
  1249. ):
  1250. # Simulate service account access token expires in 2800 seconds.
  1251. expire_time = (
  1252. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1253. ).isoformat("T") + "Z"
  1254. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1255. # STS token exchange request/response.
  1256. token_response = self.SUCCESS_RESPONSE.copy()
  1257. token_headers = {
  1258. "Content-Type": "application/x-www-form-urlencoded",
  1259. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1260. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1261. }
  1262. token_request_data = {
  1263. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1264. "audience": self.AUDIENCE,
  1265. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1266. "subject_token": "subject_token_0",
  1267. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1268. "scope": "https://www.googleapis.com/auth/iam",
  1269. }
  1270. # Service account impersonation request/response.
  1271. impersonation_response = {
  1272. "accessToken": "SA_ACCESS_TOKEN",
  1273. "expireTime": expire_time,
  1274. }
  1275. impersonation_headers = {
  1276. "Content-Type": "application/json",
  1277. "authorization": "Bearer {}".format(token_response["access_token"]),
  1278. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1279. "x-allowed-locations": "0x0",
  1280. }
  1281. impersonation_request_data = {
  1282. "delegates": None,
  1283. "scope": self.SCOPES,
  1284. "lifetime": "3600s",
  1285. }
  1286. # Initialize mock request to handle token exchange and service account
  1287. # impersonation request.
  1288. request = self.make_mock_request(
  1289. status=http_client.OK,
  1290. data=token_response,
  1291. impersonation_status=http_client.OK,
  1292. impersonation_data=impersonation_response,
  1293. )
  1294. # Initialize credentials with service account impersonation and basic auth.
  1295. credentials = self.make_credentials(
  1296. client_id=CLIENT_ID,
  1297. client_secret=CLIENT_SECRET,
  1298. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1299. scopes=None,
  1300. # Default scopes will be used since user specified scopes are none.
  1301. default_scopes=self.SCOPES,
  1302. )
  1303. credentials.refresh(request)
  1304. # Only 2 requests should be processed.
  1305. assert len(request.call_args_list) == 2
  1306. # Verify token exchange request parameters.
  1307. self.assert_token_request_kwargs(
  1308. request.call_args_list[0][1], token_headers, token_request_data
  1309. )
  1310. # Verify service account impersonation request parameters.
  1311. self.assert_impersonation_request_kwargs(
  1312. request.call_args_list[1][1],
  1313. impersonation_headers,
  1314. impersonation_request_data,
  1315. )
  1316. assert credentials.valid
  1317. assert credentials.expiry == expected_expiry
  1318. assert not credentials.expired
  1319. assert credentials.token == impersonation_response["accessToken"]
  1320. def test_apply_without_quota_project_id(self):
  1321. headers = {}
  1322. request = self.make_mock_request(
  1323. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1324. )
  1325. credentials = self.make_credentials()
  1326. credentials.refresh(request)
  1327. credentials.apply(headers)
  1328. assert headers == {
  1329. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1330. "x-allowed-locations": "0x0",
  1331. }
  1332. def test_apply_workforce_without_quota_project_id(self):
  1333. headers = {}
  1334. request = self.make_mock_request(
  1335. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1336. )
  1337. credentials = self.make_workforce_pool_credentials(
  1338. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  1339. )
  1340. credentials.refresh(request)
  1341. credentials.apply(headers)
  1342. assert headers == {
  1343. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1344. "x-allowed-locations": "0x0",
  1345. }
  1346. def test_apply_impersonation_without_quota_project_id(self):
  1347. expire_time = (
  1348. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1349. ).isoformat("T") + "Z"
  1350. # Service account impersonation response.
  1351. impersonation_response = {
  1352. "accessToken": "SA_ACCESS_TOKEN",
  1353. "expireTime": expire_time,
  1354. }
  1355. # Initialize mock request to handle token exchange and service account
  1356. # impersonation request.
  1357. request = self.make_mock_request(
  1358. status=http_client.OK,
  1359. data=self.SUCCESS_RESPONSE.copy(),
  1360. impersonation_status=http_client.OK,
  1361. impersonation_data=impersonation_response,
  1362. )
  1363. # Initialize credentials with service account impersonation.
  1364. credentials = self.make_credentials(
  1365. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1366. scopes=self.SCOPES,
  1367. )
  1368. headers = {}
  1369. credentials.refresh(request)
  1370. credentials.apply(headers)
  1371. assert headers == {
  1372. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1373. "x-allowed-locations": "0x0",
  1374. }
  1375. def test_apply_with_quota_project_id(self):
  1376. headers = {"other": "header-value"}
  1377. request = self.make_mock_request(
  1378. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1379. )
  1380. credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
  1381. credentials.refresh(request)
  1382. credentials.apply(headers)
  1383. assert headers == {
  1384. "other": "header-value",
  1385. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1386. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1387. "x-allowed-locations": "0x0",
  1388. }
  1389. def test_apply_impersonation_with_quota_project_id(self):
  1390. expire_time = (
  1391. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1392. ).isoformat("T") + "Z"
  1393. # Service account impersonation response.
  1394. impersonation_response = {
  1395. "accessToken": "SA_ACCESS_TOKEN",
  1396. "expireTime": expire_time,
  1397. }
  1398. # Initialize mock request to handle token exchange and service account
  1399. # impersonation request.
  1400. request = self.make_mock_request(
  1401. status=http_client.OK,
  1402. data=self.SUCCESS_RESPONSE.copy(),
  1403. impersonation_status=http_client.OK,
  1404. impersonation_data=impersonation_response,
  1405. )
  1406. # Initialize credentials with service account impersonation.
  1407. credentials = self.make_credentials(
  1408. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1409. scopes=self.SCOPES,
  1410. quota_project_id=self.QUOTA_PROJECT_ID,
  1411. )
  1412. headers = {"other": "header-value"}
  1413. credentials.refresh(request)
  1414. credentials.apply(headers)
  1415. assert headers == {
  1416. "other": "header-value",
  1417. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1418. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1419. "x-allowed-locations": "0x0",
  1420. }
  1421. def test_before_request(self):
  1422. headers = {"other": "header-value"}
  1423. request = self.make_mock_request(
  1424. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1425. )
  1426. credentials = self.make_credentials()
  1427. # First call should call refresh, setting the token.
  1428. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1429. assert headers == {
  1430. "other": "header-value",
  1431. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1432. "x-allowed-locations": "0x0",
  1433. }
  1434. # Second call shouldn't call refresh.
  1435. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1436. assert headers == {
  1437. "other": "header-value",
  1438. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1439. "x-allowed-locations": "0x0",
  1440. }
  1441. def test_before_request_workforce(self):
  1442. headers = {"other": "header-value"}
  1443. request = self.make_mock_request(
  1444. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1445. )
  1446. credentials = self.make_workforce_pool_credentials(
  1447. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  1448. )
  1449. # First call should call refresh, setting the token.
  1450. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1451. assert headers == {
  1452. "other": "header-value",
  1453. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1454. "x-allowed-locations": "0x0",
  1455. }
  1456. # Second call shouldn't call refresh.
  1457. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1458. assert headers == {
  1459. "other": "header-value",
  1460. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1461. "x-allowed-locations": "0x0",
  1462. }
  1463. def test_before_request_impersonation(self):
  1464. expire_time = (
  1465. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1466. ).isoformat("T") + "Z"
  1467. # Service account impersonation response.
  1468. impersonation_response = {
  1469. "accessToken": "SA_ACCESS_TOKEN",
  1470. "expireTime": expire_time,
  1471. }
  1472. # Initialize mock request to handle token exchange and service account
  1473. # impersonation request.
  1474. request = self.make_mock_request(
  1475. status=http_client.OK,
  1476. data=self.SUCCESS_RESPONSE.copy(),
  1477. impersonation_status=http_client.OK,
  1478. impersonation_data=impersonation_response,
  1479. )
  1480. headers = {"other": "header-value"}
  1481. credentials = self.make_credentials(
  1482. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  1483. )
  1484. # First call should call refresh, setting the token.
  1485. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1486. assert headers == {
  1487. "other": "header-value",
  1488. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1489. "x-allowed-locations": "0x0",
  1490. }
  1491. # Second call shouldn't call refresh.
  1492. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1493. assert headers == {
  1494. "other": "header-value",
  1495. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1496. "x-allowed-locations": "0x0",
  1497. }
  1498. @mock.patch("google.auth._helpers.utcnow")
  1499. def test_before_request_expired(self, utcnow):
  1500. headers = {}
  1501. request = self.make_mock_request(
  1502. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1503. )
  1504. credentials = self.make_credentials()
  1505. credentials.token = "token"
  1506. utcnow.return_value = datetime.datetime.min
  1507. # Set the expiration to one second more than now plus the clock skew
  1508. # accomodation. These credentials should be valid.
  1509. credentials.expiry = (
  1510. datetime.datetime.min
  1511. + _helpers.REFRESH_THRESHOLD
  1512. + datetime.timedelta(seconds=1)
  1513. )
  1514. assert credentials.valid
  1515. assert not credentials.expired
  1516. assert credentials.token_state == TokenState.FRESH
  1517. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1518. # Cached token should be used.
  1519. assert headers == {
  1520. "authorization": "Bearer token",
  1521. "x-allowed-locations": "0x0",
  1522. }
  1523. # Next call should simulate 1 second passed.
  1524. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  1525. assert not credentials.valid
  1526. assert credentials.expired
  1527. assert credentials.token_state == TokenState.STALE
  1528. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1529. assert credentials.token_state == TokenState.FRESH
  1530. # New token should be retrieved.
  1531. assert headers == {
  1532. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1533. "x-allowed-locations": "0x0",
  1534. }
  1535. @mock.patch("google.auth._helpers.utcnow")
  1536. def test_before_request_impersonation_expired(self, utcnow):
  1537. headers = {}
  1538. expire_time = (
  1539. datetime.datetime.min + datetime.timedelta(seconds=3601)
  1540. ).isoformat("T") + "Z"
  1541. # Service account impersonation response.
  1542. impersonation_response = {
  1543. "accessToken": "SA_ACCESS_TOKEN",
  1544. "expireTime": expire_time,
  1545. }
  1546. # Initialize mock request to handle token exchange and service account
  1547. # impersonation request.
  1548. request = self.make_mock_request(
  1549. status=http_client.OK,
  1550. data=self.SUCCESS_RESPONSE.copy(),
  1551. impersonation_status=http_client.OK,
  1552. impersonation_data=impersonation_response,
  1553. )
  1554. credentials = self.make_credentials(
  1555. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  1556. )
  1557. credentials.token = "token"
  1558. utcnow.return_value = datetime.datetime.min
  1559. # Set the expiration to one second more than now plus the clock skew
  1560. # accomodation. These credentials should be valid.
  1561. credentials.expiry = (
  1562. datetime.datetime.min
  1563. + _helpers.REFRESH_THRESHOLD
  1564. + datetime.timedelta(seconds=1)
  1565. )
  1566. assert credentials.valid
  1567. assert not credentials.expired
  1568. assert credentials.token_state == TokenState.FRESH
  1569. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1570. assert credentials.token_state == TokenState.FRESH
  1571. # Cached token should be used.
  1572. assert headers == {
  1573. "authorization": "Bearer token",
  1574. "x-allowed-locations": "0x0",
  1575. }
  1576. # Next call should simulate 1 second passed. This will trigger the expiration
  1577. # threshold.
  1578. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  1579. assert not credentials.valid
  1580. assert credentials.expired
  1581. assert credentials.token_state == TokenState.STALE
  1582. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1583. assert credentials.token_state == TokenState.FRESH
  1584. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1585. # New token should be retrieved.
  1586. assert headers == {
  1587. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1588. "x-allowed-locations": "0x0",
  1589. }
  1590. @pytest.mark.parametrize(
  1591. "audience",
  1592. [
  1593. # Legacy K8s audience format.
  1594. "identitynamespace:1f12345:my_provider",
  1595. # Unrealistic audiences.
  1596. "//iam.googleapis.com/projects",
  1597. "//iam.googleapis.com/projects/",
  1598. "//iam.googleapis.com/project/123456",
  1599. "//iam.googleapis.com/projects//123456",
  1600. "//iam.googleapis.com/prefix_projects/123456",
  1601. "//iam.googleapis.com/projects_suffix/123456",
  1602. ],
  1603. )
  1604. def test_project_number_indeterminable(self, audience):
  1605. credentials = CredentialsImpl(
  1606. audience=audience,
  1607. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1608. token_url=self.TOKEN_URL,
  1609. credential_source=self.CREDENTIAL_SOURCE,
  1610. )
  1611. assert credentials.project_number is None
  1612. assert credentials.get_project_id(None) is None
  1613. def test_project_number_determinable(self):
  1614. credentials = CredentialsImpl(
  1615. audience=self.AUDIENCE,
  1616. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1617. token_url=self.TOKEN_URL,
  1618. credential_source=self.CREDENTIAL_SOURCE,
  1619. )
  1620. assert credentials.project_number == self.PROJECT_NUMBER
  1621. def test_project_number_workforce(self):
  1622. credentials = CredentialsImpl(
  1623. audience=self.WORKFORCE_AUDIENCE,
  1624. subject_token_type=self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  1625. token_url=self.TOKEN_URL,
  1626. credential_source=self.CREDENTIAL_SOURCE,
  1627. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1628. )
  1629. assert credentials.project_number is None
  1630. def test_project_id_without_scopes(self):
  1631. # Initialize credentials with no scopes.
  1632. credentials = CredentialsImpl(
  1633. audience=self.AUDIENCE,
  1634. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1635. token_url=self.TOKEN_URL,
  1636. credential_source=self.CREDENTIAL_SOURCE,
  1637. )
  1638. assert credentials.get_project_id(None) is None
  1639. @mock.patch(
  1640. "google.auth.metrics.token_request_access_token_impersonate",
  1641. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1642. )
  1643. @mock.patch(
  1644. "google.auth.metrics.python_and_auth_lib_version",
  1645. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1646. )
  1647. def test_get_project_id_cloud_resource_manager_success(
  1648. self, mock_metrics_header_value, mock_auth_lib_value
  1649. ):
  1650. # STS token exchange request/response.
  1651. token_response = self.SUCCESS_RESPONSE.copy()
  1652. token_headers = {
  1653. "Content-Type": "application/x-www-form-urlencoded",
  1654. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1655. }
  1656. token_request_data = {
  1657. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1658. "audience": self.AUDIENCE,
  1659. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1660. "subject_token": "subject_token_0",
  1661. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1662. "scope": "https://www.googleapis.com/auth/iam",
  1663. }
  1664. # Service account impersonation request/response.
  1665. expire_time = (
  1666. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1667. ).isoformat("T") + "Z"
  1668. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1669. impersonation_response = {
  1670. "accessToken": "SA_ACCESS_TOKEN",
  1671. "expireTime": expire_time,
  1672. }
  1673. impersonation_headers = {
  1674. "Content-Type": "application/json",
  1675. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1676. "authorization": "Bearer {}".format(token_response["access_token"]),
  1677. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1678. "x-allowed-locations": "0x0",
  1679. }
  1680. impersonation_request_data = {
  1681. "delegates": None,
  1682. "scope": self.SCOPES,
  1683. "lifetime": "3600s",
  1684. }
  1685. # Initialize mock request to handle token exchange, service account
  1686. # impersonation and cloud resource manager request.
  1687. request = self.make_mock_request(
  1688. status=http_client.OK,
  1689. data=self.SUCCESS_RESPONSE.copy(),
  1690. impersonation_status=http_client.OK,
  1691. impersonation_data=impersonation_response,
  1692. cloud_resource_manager_status=http_client.OK,
  1693. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1694. )
  1695. credentials = self.make_credentials(
  1696. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1697. scopes=self.SCOPES,
  1698. quota_project_id=self.QUOTA_PROJECT_ID,
  1699. )
  1700. # Expected project ID from cloud resource manager response should be returned.
  1701. project_id = credentials.get_project_id(request)
  1702. assert project_id == self.PROJECT_ID
  1703. # 3 requests should be processed.
  1704. assert len(request.call_args_list) == 3
  1705. # Verify token exchange request parameters.
  1706. self.assert_token_request_kwargs(
  1707. request.call_args_list[0][1], token_headers, token_request_data
  1708. )
  1709. # Verify service account impersonation request parameters.
  1710. self.assert_impersonation_request_kwargs(
  1711. request.call_args_list[1][1],
  1712. impersonation_headers,
  1713. impersonation_request_data,
  1714. )
  1715. # In the process of getting project ID, an access token should be
  1716. # retrieved.
  1717. assert credentials.valid
  1718. assert credentials.expiry == expected_expiry
  1719. assert not credentials.expired
  1720. assert credentials.token == impersonation_response["accessToken"]
  1721. # Verify cloud resource manager request parameters.
  1722. self.assert_resource_manager_request_kwargs(
  1723. request.call_args_list[2][1],
  1724. self.PROJECT_NUMBER,
  1725. {
  1726. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1727. "authorization": "Bearer {}".format(
  1728. impersonation_response["accessToken"]
  1729. ),
  1730. "x-allowed-locations": "0x0",
  1731. },
  1732. )
  1733. # Calling get_project_id again should return the cached project_id.
  1734. project_id = credentials.get_project_id(request)
  1735. assert project_id == self.PROJECT_ID
  1736. # No additional requests.
  1737. assert len(request.call_args_list) == 3
  1738. @mock.patch(
  1739. "google.auth.metrics.python_and_auth_lib_version",
  1740. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1741. )
  1742. def test_workforce_pool_get_project_id_cloud_resource_manager_success(
  1743. self, mock_auth_lib_value
  1744. ):
  1745. # STS token exchange request/response.
  1746. token_headers = {
  1747. "Content-Type": "application/x-www-form-urlencoded",
  1748. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1749. }
  1750. token_request_data = {
  1751. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1752. "audience": self.WORKFORCE_AUDIENCE,
  1753. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1754. "subject_token": "subject_token_0",
  1755. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  1756. "scope": "scope1 scope2",
  1757. "options": urllib.parse.quote(
  1758. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  1759. ),
  1760. }
  1761. # Initialize mock request to handle token exchange and cloud resource
  1762. # manager request.
  1763. request = self.make_mock_request(
  1764. status=http_client.OK,
  1765. data=self.SUCCESS_RESPONSE.copy(),
  1766. cloud_resource_manager_status=http_client.OK,
  1767. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1768. )
  1769. credentials = self.make_workforce_pool_credentials(
  1770. scopes=self.SCOPES,
  1771. quota_project_id=self.QUOTA_PROJECT_ID,
  1772. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1773. )
  1774. # Expected project ID from cloud resource manager response should be returned.
  1775. project_id = credentials.get_project_id(request)
  1776. assert project_id == self.PROJECT_ID
  1777. # 2 requests should be processed.
  1778. assert len(request.call_args_list) == 2
  1779. # Verify token exchange request parameters.
  1780. self.assert_token_request_kwargs(
  1781. request.call_args_list[0][1], token_headers, token_request_data
  1782. )
  1783. # In the process of getting project ID, an access token should be
  1784. # retrieved.
  1785. assert credentials.valid
  1786. assert not credentials.expired
  1787. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1788. # Verify cloud resource manager request parameters.
  1789. self.assert_resource_manager_request_kwargs(
  1790. request.call_args_list[1][1],
  1791. self.WORKFORCE_POOL_USER_PROJECT,
  1792. {
  1793. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1794. "authorization": "Bearer {}".format(
  1795. self.SUCCESS_RESPONSE["access_token"]
  1796. ),
  1797. "x-allowed-locations": "0x0",
  1798. },
  1799. )
  1800. # Calling get_project_id again should return the cached project_id.
  1801. project_id = credentials.get_project_id(request)
  1802. assert project_id == self.PROJECT_ID
  1803. # No additional requests.
  1804. assert len(request.call_args_list) == 2
  1805. @mock.patch(
  1806. "google.auth.metrics.token_request_access_token_impersonate",
  1807. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1808. )
  1809. @mock.patch(
  1810. "google.auth.metrics.python_and_auth_lib_version",
  1811. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1812. )
  1813. def test_refresh_impersonation_with_lifetime(
  1814. self, mock_metrics_header_value, mock_auth_lib_value
  1815. ):
  1816. # Simulate service account access token expires in 2800 seconds.
  1817. expire_time = (
  1818. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1819. ).isoformat("T") + "Z"
  1820. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1821. # STS token exchange request/response.
  1822. token_response = self.SUCCESS_RESPONSE.copy()
  1823. token_headers = {
  1824. "Content-Type": "application/x-www-form-urlencoded",
  1825. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/true",
  1826. }
  1827. token_request_data = {
  1828. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1829. "audience": self.AUDIENCE,
  1830. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1831. "subject_token": "subject_token_0",
  1832. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1833. "scope": "https://www.googleapis.com/auth/iam",
  1834. }
  1835. # Service account impersonation request/response.
  1836. impersonation_response = {
  1837. "accessToken": "SA_ACCESS_TOKEN",
  1838. "expireTime": expire_time,
  1839. }
  1840. impersonation_headers = {
  1841. "Content-Type": "application/json",
  1842. "authorization": "Bearer {}".format(token_response["access_token"]),
  1843. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1844. "x-allowed-locations": "0x0",
  1845. }
  1846. impersonation_request_data = {
  1847. "delegates": None,
  1848. "scope": self.SCOPES,
  1849. "lifetime": "2800s",
  1850. }
  1851. # Initialize mock request to handle token exchange and service account
  1852. # impersonation request.
  1853. request = self.make_mock_request(
  1854. status=http_client.OK,
  1855. data=token_response,
  1856. impersonation_status=http_client.OK,
  1857. impersonation_data=impersonation_response,
  1858. )
  1859. # Initialize credentials with service account impersonation.
  1860. credentials = self.make_credentials(
  1861. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1862. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  1863. scopes=self.SCOPES,
  1864. )
  1865. credentials.refresh(request)
  1866. # Only 2 requests should be processed.
  1867. assert len(request.call_args_list) == 2
  1868. # Verify token exchange request parameters.
  1869. self.assert_token_request_kwargs(
  1870. request.call_args_list[0][1], token_headers, token_request_data
  1871. )
  1872. # Verify service account impersonation request parameters.
  1873. self.assert_impersonation_request_kwargs(
  1874. request.call_args_list[1][1],
  1875. impersonation_headers,
  1876. impersonation_request_data,
  1877. )
  1878. assert credentials.valid
  1879. assert credentials.expiry == expected_expiry
  1880. assert not credentials.expired
  1881. assert credentials.token == impersonation_response["accessToken"]
  1882. def test_get_project_id_cloud_resource_manager_error(self):
  1883. # Simulate resource doesn't have sufficient permissions to access
  1884. # cloud resource manager.
  1885. request = self.make_mock_request(
  1886. status=http_client.OK,
  1887. data=self.SUCCESS_RESPONSE.copy(),
  1888. cloud_resource_manager_status=http_client.UNAUTHORIZED,
  1889. )
  1890. credentials = self.make_credentials(scopes=self.SCOPES)
  1891. project_id = credentials.get_project_id(request)
  1892. assert project_id is None
  1893. # Only 2 requests to STS and cloud resource manager should be sent.
  1894. assert len(request.call_args_list) == 2
  1895. def test_supplier_context():
  1896. context = external_account.SupplierContext("TestTokenType", "TestAudience")
  1897. assert context.subject_token_type == "TestTokenType"
  1898. assert context.audience == "TestAudience"