test_external_account.py 84 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import urllib
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import exceptions
  22. from google.auth import external_account
  23. from google.auth import transport
  24. from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
  25. from google.auth.credentials import TokenState
  26. IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  27. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  28. )
  29. LANG_LIBRARY_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1"
  30. CLIENT_ID = "username"
  31. CLIENT_SECRET = "password"
  32. # Base64 encoding of "username:password"
  33. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  34. SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
  35. # List of valid workforce pool audiences.
  36. TEST_USER_AUDIENCES = [
  37. "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
  38. "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  39. "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
  40. ]
  41. # Workload identity pool audiences or invalid workforce pool audiences.
  42. TEST_NON_USER_AUDIENCES = [
  43. # Legacy K8s audience format.
  44. "identitynamespace:1f12345:my_provider",
  45. (
  46. "//iam.googleapis.com/projects/123456/locations/"
  47. "global/workloadIdentityPools/pool-id/providers/"
  48. "provider-id"
  49. ),
  50. (
  51. "//iam.googleapis.com/projects/123456/locations/"
  52. "eu/workloadIdentityPools/pool-id/providers/"
  53. "provider-id"
  54. ),
  55. # Pool ID with workforcePools string.
  56. (
  57. "//iam.googleapis.com/projects/123456/locations/"
  58. "global/workloadIdentityPools/workforcePools/providers/"
  59. "provider-id"
  60. ),
  61. # Unrealistic / incorrect workforce pool audiences.
  62. "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
  63. "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
  64. "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
  65. "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
  66. "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
  67. ]
  68. class CredentialsImpl(external_account.Credentials):
  69. def __init__(self, **kwargs):
  70. super(CredentialsImpl, self).__init__(**kwargs)
  71. self._counter = 0
  72. def retrieve_subject_token(self, request):
  73. counter = self._counter
  74. self._counter += 1
  75. return "subject_token_{}".format(counter)
  76. class TestCredentials(object):
  77. TOKEN_URL = "https://sts.googleapis.com/v1/token"
  78. TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
  79. PROJECT_NUMBER = "123456"
  80. POOL_ID = "POOL_ID"
  81. PROVIDER_ID = "PROVIDER_ID"
  82. AUDIENCE = (
  83. "//iam.googleapis.com/projects/{}"
  84. "/locations/global/workloadIdentityPools/{}"
  85. "/providers/{}"
  86. ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
  87. WORKFORCE_AUDIENCE = (
  88. "//iam.googleapis.com/locations/global/workforcePools/{}/providers/{}"
  89. ).format(POOL_ID, PROVIDER_ID)
  90. WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
  91. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  92. WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
  93. CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
  94. SUCCESS_RESPONSE = {
  95. "access_token": "ACCESS_TOKEN",
  96. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  97. "token_type": "Bearer",
  98. "expires_in": 3600,
  99. "scope": "scope1 scope2",
  100. }
  101. ERROR_RESPONSE = {
  102. "error": "invalid_request",
  103. "error_description": "Invalid subject token",
  104. "error_uri": "https://tools.ietf.org/html/rfc6749",
  105. }
  106. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  107. SERVICE_ACCOUNT_IMPERSONATION_URL = (
  108. "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
  109. + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
  110. )
  111. SCOPES = ["scope1", "scope2"]
  112. IMPERSONATION_ERROR_RESPONSE = {
  113. "error": {
  114. "code": 400,
  115. "message": "Request contains an invalid argument",
  116. "status": "INVALID_ARGUMENT",
  117. }
  118. }
  119. PROJECT_ID = "my-proj-id"
  120. CLOUD_RESOURCE_MANAGER_URL = (
  121. "https://cloudresourcemanager.googleapis.com/v1/projects/"
  122. )
  123. CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
  124. "projectNumber": PROJECT_NUMBER,
  125. "projectId": PROJECT_ID,
  126. "lifecycleState": "ACTIVE",
  127. "name": "project-name",
  128. "createTime": "2018-11-06T04:42:54.109Z",
  129. "parent": {"type": "folder", "id": "12345678901"},
  130. }
  131. @classmethod
  132. def make_credentials(
  133. cls,
  134. client_id=None,
  135. client_secret=None,
  136. quota_project_id=None,
  137. token_info_url=None,
  138. scopes=None,
  139. default_scopes=None,
  140. service_account_impersonation_url=None,
  141. service_account_impersonation_options={},
  142. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  143. ):
  144. return CredentialsImpl(
  145. audience=cls.AUDIENCE,
  146. subject_token_type=cls.SUBJECT_TOKEN_TYPE,
  147. token_url=cls.TOKEN_URL,
  148. token_info_url=token_info_url,
  149. service_account_impersonation_url=service_account_impersonation_url,
  150. service_account_impersonation_options=service_account_impersonation_options,
  151. credential_source=cls.CREDENTIAL_SOURCE,
  152. client_id=client_id,
  153. client_secret=client_secret,
  154. quota_project_id=quota_project_id,
  155. scopes=scopes,
  156. default_scopes=default_scopes,
  157. universe_domain=universe_domain,
  158. )
  159. @classmethod
  160. def make_workforce_pool_credentials(
  161. cls,
  162. client_id=None,
  163. client_secret=None,
  164. quota_project_id=None,
  165. scopes=None,
  166. default_scopes=None,
  167. service_account_impersonation_url=None,
  168. workforce_pool_user_project=None,
  169. ):
  170. return CredentialsImpl(
  171. audience=cls.WORKFORCE_AUDIENCE,
  172. subject_token_type=cls.WORKFORCE_SUBJECT_TOKEN_TYPE,
  173. token_url=cls.TOKEN_URL,
  174. service_account_impersonation_url=service_account_impersonation_url,
  175. credential_source=cls.CREDENTIAL_SOURCE,
  176. client_id=client_id,
  177. client_secret=client_secret,
  178. quota_project_id=quota_project_id,
  179. scopes=scopes,
  180. default_scopes=default_scopes,
  181. workforce_pool_user_project=workforce_pool_user_project,
  182. )
  183. @classmethod
  184. def make_mock_request(
  185. cls,
  186. status=http_client.OK,
  187. data=None,
  188. impersonation_status=None,
  189. impersonation_data=None,
  190. cloud_resource_manager_status=None,
  191. cloud_resource_manager_data=None,
  192. ):
  193. # STS token exchange request.
  194. token_response = mock.create_autospec(transport.Response, instance=True)
  195. token_response.status = status
  196. token_response.data = json.dumps(data).encode("utf-8")
  197. responses = [token_response]
  198. # If service account impersonation is requested, mock the expected response.
  199. if impersonation_status:
  200. impersonation_response = mock.create_autospec(
  201. transport.Response, instance=True
  202. )
  203. impersonation_response.status = impersonation_status
  204. impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
  205. responses.append(impersonation_response)
  206. # If cloud resource manager is requested, mock the expected response.
  207. if cloud_resource_manager_status:
  208. cloud_resource_manager_response = mock.create_autospec(
  209. transport.Response, instance=True
  210. )
  211. cloud_resource_manager_response.status = cloud_resource_manager_status
  212. cloud_resource_manager_response.data = json.dumps(
  213. cloud_resource_manager_data
  214. ).encode("utf-8")
  215. responses.append(cloud_resource_manager_response)
  216. request = mock.create_autospec(transport.Request)
  217. request.side_effect = responses
  218. return request
  219. @classmethod
  220. def assert_token_request_kwargs(
  221. cls, request_kwargs, headers, request_data, cert=None
  222. ):
  223. assert request_kwargs["url"] == cls.TOKEN_URL
  224. assert request_kwargs["method"] == "POST"
  225. assert request_kwargs["headers"] == headers
  226. if cert is not None:
  227. assert request_kwargs["cert"] == cert
  228. else:
  229. assert "cert" not in request_kwargs
  230. assert request_kwargs["body"] is not None
  231. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  232. for (k, v) in body_tuples:
  233. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  234. assert len(body_tuples) == len(request_data.keys())
  235. @classmethod
  236. def assert_impersonation_request_kwargs(
  237. cls, request_kwargs, headers, request_data, cert=None
  238. ):
  239. assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
  240. assert request_kwargs["method"] == "POST"
  241. assert request_kwargs["headers"] == headers
  242. if cert is not None:
  243. assert request_kwargs["cert"] == cert
  244. else:
  245. assert "cert" not in request_kwargs
  246. assert request_kwargs["body"] is not None
  247. body_json = json.loads(request_kwargs["body"].decode("utf-8"))
  248. assert body_json == request_data
  249. @classmethod
  250. def assert_resource_manager_request_kwargs(
  251. cls, request_kwargs, project_number, headers
  252. ):
  253. assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
  254. assert request_kwargs["method"] == "GET"
  255. assert request_kwargs["headers"] == headers
  256. assert "body" not in request_kwargs
  257. def test_default_state(self):
  258. credentials = self.make_credentials(
  259. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  260. )
  261. # Token url and service account impersonation url should be set
  262. assert credentials._token_url
  263. assert credentials._service_account_impersonation_url
  264. # Not token acquired yet
  265. assert not credentials.token
  266. assert not credentials.valid
  267. # Expiration hasn't been set yet
  268. assert not credentials.expiry
  269. assert not credentials.expired
  270. # Scopes are required
  271. assert not credentials.scopes
  272. assert credentials.requires_scopes
  273. assert not credentials.quota_project_id
  274. # Token info url not set yet
  275. assert not credentials.token_info_url
  276. def test_nonworkforce_with_workforce_pool_user_project(self):
  277. with pytest.raises(ValueError) as excinfo:
  278. CredentialsImpl(
  279. audience=self.AUDIENCE,
  280. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  281. token_url=self.TOKEN_URL,
  282. credential_source=self.CREDENTIAL_SOURCE,
  283. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  284. )
  285. assert excinfo.match(
  286. "workforce_pool_user_project should not be set for non-workforce "
  287. "pool credentials"
  288. )
  289. def test_with_scopes(self):
  290. credentials = self.make_credentials()
  291. assert not credentials.scopes
  292. assert credentials.requires_scopes
  293. scoped_credentials = credentials.with_scopes(["email"])
  294. assert scoped_credentials.has_scopes(["email"])
  295. assert not scoped_credentials.requires_scopes
  296. def test_with_scopes_workforce_pool(self):
  297. credentials = self.make_workforce_pool_credentials(
  298. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  299. )
  300. assert not credentials.scopes
  301. assert credentials.requires_scopes
  302. scoped_credentials = credentials.with_scopes(["email"])
  303. assert scoped_credentials.has_scopes(["email"])
  304. assert not scoped_credentials.requires_scopes
  305. assert (
  306. scoped_credentials.info.get("workforce_pool_user_project")
  307. == self.WORKFORCE_POOL_USER_PROJECT
  308. )
  309. def test_with_scopes_using_user_and_default_scopes(self):
  310. credentials = self.make_credentials()
  311. assert not credentials.scopes
  312. assert credentials.requires_scopes
  313. scoped_credentials = credentials.with_scopes(
  314. ["email"], default_scopes=["profile"]
  315. )
  316. assert scoped_credentials.has_scopes(["email"])
  317. assert not scoped_credentials.has_scopes(["profile"])
  318. assert not scoped_credentials.requires_scopes
  319. assert scoped_credentials.scopes == ["email"]
  320. assert scoped_credentials.default_scopes == ["profile"]
  321. def test_with_scopes_using_default_scopes_only(self):
  322. credentials = self.make_credentials()
  323. assert not credentials.scopes
  324. assert credentials.requires_scopes
  325. scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
  326. assert scoped_credentials.has_scopes(["profile"])
  327. assert not scoped_credentials.requires_scopes
  328. def test_with_scopes_full_options_propagated(self):
  329. credentials = self.make_credentials(
  330. client_id=CLIENT_ID,
  331. client_secret=CLIENT_SECRET,
  332. quota_project_id=self.QUOTA_PROJECT_ID,
  333. scopes=self.SCOPES,
  334. token_info_url=self.TOKEN_INFO_URL,
  335. default_scopes=["default1"],
  336. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  337. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  338. )
  339. with mock.patch.object(
  340. external_account.Credentials, "__init__", return_value=None
  341. ) as mock_init:
  342. credentials.with_scopes(["email"], ["default2"])
  343. # Confirm with_scopes initialized the credential with the expected
  344. # parameters and scopes.
  345. mock_init.assert_called_once_with(
  346. audience=self.AUDIENCE,
  347. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  348. token_url=self.TOKEN_URL,
  349. token_info_url=self.TOKEN_INFO_URL,
  350. credential_source=self.CREDENTIAL_SOURCE,
  351. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  352. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  353. client_id=CLIENT_ID,
  354. client_secret=CLIENT_SECRET,
  355. quota_project_id=self.QUOTA_PROJECT_ID,
  356. scopes=["email"],
  357. default_scopes=["default2"],
  358. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  359. )
  360. def test_with_token_uri(self):
  361. credentials = self.make_credentials()
  362. new_token_uri = "https://eu-sts.googleapis.com/v1/token"
  363. assert credentials._token_url == self.TOKEN_URL
  364. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  365. assert creds_with_new_token_uri._token_url == new_token_uri
  366. def test_with_token_uri_workforce_pool(self):
  367. credentials = self.make_workforce_pool_credentials(
  368. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  369. )
  370. new_token_uri = "https://eu-sts.googleapis.com/v1/token"
  371. assert credentials._token_url == self.TOKEN_URL
  372. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  373. assert creds_with_new_token_uri._token_url == new_token_uri
  374. assert (
  375. creds_with_new_token_uri.info.get("workforce_pool_user_project")
  376. == self.WORKFORCE_POOL_USER_PROJECT
  377. )
  378. def test_with_quota_project(self):
  379. credentials = self.make_credentials()
  380. assert not credentials.scopes
  381. assert not credentials.quota_project_id
  382. quota_project_creds = credentials.with_quota_project("project-foo")
  383. assert quota_project_creds.quota_project_id == "project-foo"
  384. def test_with_quota_project_workforce_pool(self):
  385. credentials = self.make_workforce_pool_credentials(
  386. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  387. )
  388. assert not credentials.scopes
  389. assert not credentials.quota_project_id
  390. quota_project_creds = credentials.with_quota_project("project-foo")
  391. assert quota_project_creds.quota_project_id == "project-foo"
  392. assert (
  393. quota_project_creds.info.get("workforce_pool_user_project")
  394. == self.WORKFORCE_POOL_USER_PROJECT
  395. )
  396. def test_with_quota_project_full_options_propagated(self):
  397. credentials = self.make_credentials(
  398. client_id=CLIENT_ID,
  399. client_secret=CLIENT_SECRET,
  400. token_info_url=self.TOKEN_INFO_URL,
  401. quota_project_id=self.QUOTA_PROJECT_ID,
  402. scopes=self.SCOPES,
  403. default_scopes=["default1"],
  404. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  405. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  406. )
  407. with mock.patch.object(
  408. external_account.Credentials, "__init__", return_value=None
  409. ) as mock_init:
  410. credentials.with_quota_project("project-foo")
  411. # Confirm with_quota_project initialized the credential with the
  412. # expected parameters and quota project ID.
  413. mock_init.assert_called_once_with(
  414. audience=self.AUDIENCE,
  415. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  416. token_url=self.TOKEN_URL,
  417. token_info_url=self.TOKEN_INFO_URL,
  418. credential_source=self.CREDENTIAL_SOURCE,
  419. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  420. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  421. client_id=CLIENT_ID,
  422. client_secret=CLIENT_SECRET,
  423. quota_project_id="project-foo",
  424. scopes=self.SCOPES,
  425. default_scopes=["default1"],
  426. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  427. )
  428. def test_info(self):
  429. credentials = self.make_credentials(universe_domain="dummy_universe.com")
  430. assert credentials.info == {
  431. "type": "external_account",
  432. "audience": self.AUDIENCE,
  433. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  434. "token_url": self.TOKEN_URL,
  435. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  436. "universe_domain": "dummy_universe.com",
  437. }
  438. def test_universe_domain(self):
  439. credentials = self.make_credentials(universe_domain="dummy_universe.com")
  440. assert credentials.universe_domain == "dummy_universe.com"
  441. credentials = self.make_credentials()
  442. assert credentials.universe_domain == DEFAULT_UNIVERSE_DOMAIN
  443. def test_with_universe_domain(self):
  444. credentials = self.make_credentials()
  445. new_credentials = credentials.with_universe_domain("dummy_universe.com")
  446. assert new_credentials.universe_domain == "dummy_universe.com"
  447. def test_info_workforce_pool(self):
  448. credentials = self.make_workforce_pool_credentials(
  449. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  450. )
  451. assert credentials.info == {
  452. "type": "external_account",
  453. "audience": self.WORKFORCE_AUDIENCE,
  454. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  455. "token_url": self.TOKEN_URL,
  456. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  457. "workforce_pool_user_project": self.WORKFORCE_POOL_USER_PROJECT,
  458. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  459. }
  460. def test_info_with_full_options(self):
  461. credentials = self.make_credentials(
  462. client_id=CLIENT_ID,
  463. client_secret=CLIENT_SECRET,
  464. quota_project_id=self.QUOTA_PROJECT_ID,
  465. token_info_url=self.TOKEN_INFO_URL,
  466. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  467. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  468. )
  469. assert credentials.info == {
  470. "type": "external_account",
  471. "audience": self.AUDIENCE,
  472. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  473. "token_url": self.TOKEN_URL,
  474. "token_info_url": self.TOKEN_INFO_URL,
  475. "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  476. "service_account_impersonation": {"token_lifetime_seconds": 2800},
  477. "credential_source": self.CREDENTIAL_SOURCE.copy(),
  478. "quota_project_id": self.QUOTA_PROJECT_ID,
  479. "client_id": CLIENT_ID,
  480. "client_secret": CLIENT_SECRET,
  481. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  482. }
  483. def test_service_account_email_without_impersonation(self):
  484. credentials = self.make_credentials()
  485. assert credentials.service_account_email is None
  486. def test_service_account_email_with_impersonation(self):
  487. credentials = self.make_credentials(
  488. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  489. )
  490. assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
  491. @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
  492. def test_is_user_with_non_users(self, audience):
  493. credentials = CredentialsImpl(
  494. audience=audience,
  495. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  496. token_url=self.TOKEN_URL,
  497. credential_source=self.CREDENTIAL_SOURCE,
  498. )
  499. assert credentials.is_user is False
  500. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  501. def test_is_user_with_users(self, audience):
  502. credentials = CredentialsImpl(
  503. audience=audience,
  504. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  505. token_url=self.TOKEN_URL,
  506. credential_source=self.CREDENTIAL_SOURCE,
  507. )
  508. assert credentials.is_user is True
  509. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  510. def test_is_user_with_users_and_impersonation(self, audience):
  511. # Initialize the credentials with service account impersonation.
  512. credentials = CredentialsImpl(
  513. audience=audience,
  514. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  515. token_url=self.TOKEN_URL,
  516. credential_source=self.CREDENTIAL_SOURCE,
  517. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  518. )
  519. # Even though the audience is for a workforce pool, since service account
  520. # impersonation is used, the credentials will represent a service account and
  521. # not a user.
  522. assert credentials.is_user is False
  523. @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
  524. def test_is_workforce_pool_with_non_users(self, audience):
  525. credentials = CredentialsImpl(
  526. audience=audience,
  527. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  528. token_url=self.TOKEN_URL,
  529. credential_source=self.CREDENTIAL_SOURCE,
  530. )
  531. assert credentials.is_workforce_pool is False
  532. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  533. def test_is_workforce_pool_with_users(self, audience):
  534. credentials = CredentialsImpl(
  535. audience=audience,
  536. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  537. token_url=self.TOKEN_URL,
  538. credential_source=self.CREDENTIAL_SOURCE,
  539. )
  540. assert credentials.is_workforce_pool is True
  541. @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
  542. def test_is_workforce_pool_with_users_and_impersonation(self, audience):
  543. # Initialize the credentials with workforce audience and service account
  544. # impersonation.
  545. credentials = CredentialsImpl(
  546. audience=audience,
  547. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  548. token_url=self.TOKEN_URL,
  549. credential_source=self.CREDENTIAL_SOURCE,
  550. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  551. )
  552. # Even though impersonation is used, is_workforce_pool should still return True.
  553. assert credentials.is_workforce_pool is True
  554. @pytest.mark.parametrize("mock_expires_in", [2800, "2800"])
  555. @mock.patch(
  556. "google.auth.metrics.python_and_auth_lib_version",
  557. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  558. )
  559. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  560. def test_refresh_without_client_auth_success(
  561. self, unused_utcnow, mock_auth_lib_value, mock_expires_in
  562. ):
  563. response = self.SUCCESS_RESPONSE.copy()
  564. # Test custom expiration to confirm expiry is set correctly.
  565. response["expires_in"] = mock_expires_in
  566. expected_expiry = datetime.datetime.min + datetime.timedelta(
  567. seconds=int(mock_expires_in)
  568. )
  569. headers = {
  570. "Content-Type": "application/x-www-form-urlencoded",
  571. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  572. }
  573. request_data = {
  574. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  575. "audience": self.AUDIENCE,
  576. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  577. "subject_token": "subject_token_0",
  578. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  579. }
  580. request = self.make_mock_request(status=http_client.OK, data=response)
  581. credentials = self.make_credentials()
  582. credentials.refresh(request)
  583. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  584. assert credentials.valid
  585. assert credentials.expiry == expected_expiry
  586. assert not credentials.expired
  587. assert credentials.token == response["access_token"]
  588. @mock.patch(
  589. "google.auth.metrics.python_and_auth_lib_version",
  590. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  591. )
  592. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  593. @mock.patch(
  594. "google.auth.external_account.Credentials._mtls_required", return_value=True
  595. )
  596. @mock.patch(
  597. "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths",
  598. return_value=("path/to/cert.pem", "path/to/key.pem"),
  599. )
  600. def test_refresh_with_mtls(
  601. self,
  602. mock_get_mtls_cert_and_key_paths,
  603. mock_mtls_required,
  604. unused_utcnow,
  605. mock_auth_lib_value,
  606. ):
  607. response = self.SUCCESS_RESPONSE.copy()
  608. # Test custom expiration to confirm expiry is set correctly.
  609. response["expires_in"] = 2800
  610. expected_expiry = datetime.datetime.min + datetime.timedelta(
  611. seconds=response["expires_in"]
  612. )
  613. headers = {
  614. "Content-Type": "application/x-www-form-urlencoded",
  615. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  616. }
  617. request_data = {
  618. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  619. "audience": self.AUDIENCE,
  620. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  621. "subject_token": "subject_token_0",
  622. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  623. }
  624. request = self.make_mock_request(status=http_client.OK, data=response)
  625. credentials = self.make_credentials()
  626. credentials.refresh(request)
  627. expected_cert_path = ("path/to/cert.pem", "path/to/key.pem")
  628. self.assert_token_request_kwargs(
  629. request.call_args[1], headers, request_data, expected_cert_path
  630. )
  631. assert credentials.valid
  632. assert credentials.expiry == expected_expiry
  633. assert not credentials.expired
  634. assert credentials.token == response["access_token"]
  635. @mock.patch(
  636. "google.auth.metrics.python_and_auth_lib_version",
  637. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  638. )
  639. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  640. def test_refresh_workforce_without_client_auth_success(
  641. self, unused_utcnow, test_auth_lib_value
  642. ):
  643. response = self.SUCCESS_RESPONSE.copy()
  644. # Test custom expiration to confirm expiry is set correctly.
  645. response["expires_in"] = 2800
  646. expected_expiry = datetime.datetime.min + datetime.timedelta(
  647. seconds=response["expires_in"]
  648. )
  649. headers = {
  650. "Content-Type": "application/x-www-form-urlencoded",
  651. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  652. }
  653. request_data = {
  654. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  655. "audience": self.WORKFORCE_AUDIENCE,
  656. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  657. "subject_token": "subject_token_0",
  658. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  659. "options": urllib.parse.quote(
  660. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  661. ),
  662. }
  663. request = self.make_mock_request(status=http_client.OK, data=response)
  664. credentials = self.make_workforce_pool_credentials(
  665. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  666. )
  667. credentials.refresh(request)
  668. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  669. assert credentials.valid
  670. assert credentials.expiry == expected_expiry
  671. assert not credentials.expired
  672. assert credentials.token == response["access_token"]
  673. @mock.patch(
  674. "google.auth.metrics.python_and_auth_lib_version",
  675. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  676. )
  677. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  678. def test_refresh_workforce_with_client_auth_success(
  679. self, unused_utcnow, mock_auth_lib_value
  680. ):
  681. response = self.SUCCESS_RESPONSE.copy()
  682. # Test custom expiration to confirm expiry is set correctly.
  683. response["expires_in"] = 2800
  684. expected_expiry = datetime.datetime.min + datetime.timedelta(
  685. seconds=response["expires_in"]
  686. )
  687. headers = {
  688. "Content-Type": "application/x-www-form-urlencoded",
  689. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  690. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  691. }
  692. request_data = {
  693. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  694. "audience": self.WORKFORCE_AUDIENCE,
  695. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  696. "subject_token": "subject_token_0",
  697. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  698. }
  699. request = self.make_mock_request(status=http_client.OK, data=response)
  700. # Client Auth will have higher priority over workforce_pool_user_project.
  701. credentials = self.make_workforce_pool_credentials(
  702. client_id=CLIENT_ID,
  703. client_secret=CLIENT_SECRET,
  704. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  705. )
  706. credentials.refresh(request)
  707. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  708. assert credentials.valid
  709. assert credentials.expiry == expected_expiry
  710. assert not credentials.expired
  711. assert credentials.token == response["access_token"]
  712. @mock.patch(
  713. "google.auth.metrics.python_and_auth_lib_version",
  714. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  715. )
  716. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  717. def test_refresh_workforce_with_client_auth_and_no_workforce_project_success(
  718. self, unused_utcnow, mock_lib_version_value
  719. ):
  720. response = self.SUCCESS_RESPONSE.copy()
  721. # Test custom expiration to confirm expiry is set correctly.
  722. response["expires_in"] = 2800
  723. expected_expiry = datetime.datetime.min + datetime.timedelta(
  724. seconds=response["expires_in"]
  725. )
  726. headers = {
  727. "Content-Type": "application/x-www-form-urlencoded",
  728. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  729. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  730. }
  731. request_data = {
  732. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  733. "audience": self.WORKFORCE_AUDIENCE,
  734. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  735. "subject_token": "subject_token_0",
  736. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  737. }
  738. request = self.make_mock_request(status=http_client.OK, data=response)
  739. # Client Auth will be sufficient for user project determination.
  740. credentials = self.make_workforce_pool_credentials(
  741. client_id=CLIENT_ID,
  742. client_secret=CLIENT_SECRET,
  743. workforce_pool_user_project=None,
  744. )
  745. credentials.refresh(request)
  746. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  747. assert credentials.valid
  748. assert credentials.expiry == expected_expiry
  749. assert not credentials.expired
  750. assert credentials.token == response["access_token"]
  751. @mock.patch(
  752. "google.auth.metrics.token_request_access_token_impersonate",
  753. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  754. )
  755. @mock.patch(
  756. "google.auth.metrics.python_and_auth_lib_version",
  757. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  758. )
  759. def test_refresh_impersonation_without_client_auth_success(
  760. self, mock_metrics_header_value, mock_auth_lib_value
  761. ):
  762. # Simulate service account access token expires in 2800 seconds.
  763. expire_time = (
  764. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  765. ).isoformat("T") + "Z"
  766. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  767. # STS token exchange request/response.
  768. token_response = self.SUCCESS_RESPONSE.copy()
  769. token_headers = {
  770. "Content-Type": "application/x-www-form-urlencoded",
  771. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  772. }
  773. token_request_data = {
  774. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  775. "audience": self.AUDIENCE,
  776. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  777. "subject_token": "subject_token_0",
  778. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  779. "scope": "https://www.googleapis.com/auth/iam",
  780. }
  781. # Service account impersonation request/response.
  782. impersonation_response = {
  783. "accessToken": "SA_ACCESS_TOKEN",
  784. "expireTime": expire_time,
  785. }
  786. impersonation_headers = {
  787. "Content-Type": "application/json",
  788. "authorization": "Bearer {}".format(token_response["access_token"]),
  789. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  790. "x-allowed-locations": "0x0",
  791. }
  792. impersonation_request_data = {
  793. "delegates": None,
  794. "scope": self.SCOPES,
  795. "lifetime": "3600s",
  796. }
  797. # Initialize mock request to handle token exchange and service account
  798. # impersonation request.
  799. request = self.make_mock_request(
  800. status=http_client.OK,
  801. data=token_response,
  802. impersonation_status=http_client.OK,
  803. impersonation_data=impersonation_response,
  804. )
  805. # Initialize credentials with service account impersonation.
  806. credentials = self.make_credentials(
  807. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  808. scopes=self.SCOPES,
  809. )
  810. credentials.refresh(request)
  811. # Only 2 requests should be processed.
  812. assert len(request.call_args_list) == 2
  813. # Verify token exchange request parameters.
  814. self.assert_token_request_kwargs(
  815. request.call_args_list[0][1], token_headers, token_request_data
  816. )
  817. # Verify service account impersonation request parameters.
  818. self.assert_impersonation_request_kwargs(
  819. request.call_args_list[1][1],
  820. impersonation_headers,
  821. impersonation_request_data,
  822. )
  823. assert credentials.valid
  824. assert credentials.expiry == expected_expiry
  825. assert not credentials.expired
  826. assert credentials.token == impersonation_response["accessToken"]
  827. @mock.patch(
  828. "google.auth.metrics.token_request_access_token_impersonate",
  829. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  830. )
  831. @mock.patch(
  832. "google.auth.metrics.python_and_auth_lib_version",
  833. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  834. )
  835. @mock.patch(
  836. "google.auth.external_account.Credentials._mtls_required", return_value=True
  837. )
  838. @mock.patch(
  839. "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths",
  840. return_value=("path/to/cert.pem", "path/to/key.pem"),
  841. )
  842. def test_refresh_impersonation_with_mtls_success(
  843. self,
  844. mock_get_mtls_cert_and_key_paths,
  845. mock_mtls_required,
  846. mock_metrics_header_value,
  847. mock_auth_lib_value,
  848. ):
  849. # Simulate service account access token expires in 2800 seconds.
  850. expire_time = (
  851. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  852. ).isoformat("T") + "Z"
  853. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  854. # STS token exchange request/response.
  855. token_response = self.SUCCESS_RESPONSE.copy()
  856. token_headers = {
  857. "Content-Type": "application/x-www-form-urlencoded",
  858. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  859. }
  860. token_request_data = {
  861. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  862. "audience": self.AUDIENCE,
  863. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  864. "subject_token": "subject_token_0",
  865. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  866. "scope": "https://www.googleapis.com/auth/iam",
  867. }
  868. # Service account impersonation request/response.
  869. impersonation_response = {
  870. "accessToken": "SA_ACCESS_TOKEN",
  871. "expireTime": expire_time,
  872. }
  873. impersonation_headers = {
  874. "Content-Type": "application/json",
  875. "authorization": "Bearer {}".format(token_response["access_token"]),
  876. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  877. "x-allowed-locations": "0x0",
  878. }
  879. impersonation_request_data = {
  880. "delegates": None,
  881. "scope": self.SCOPES,
  882. "lifetime": "3600s",
  883. }
  884. # Initialize mock request to handle token exchange and service account
  885. # impersonation request.
  886. request = self.make_mock_request(
  887. status=http_client.OK,
  888. data=token_response,
  889. impersonation_status=http_client.OK,
  890. impersonation_data=impersonation_response,
  891. )
  892. # Initialize credentials with service account impersonation.
  893. credentials = self.make_credentials(
  894. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  895. scopes=self.SCOPES,
  896. )
  897. credentials.refresh(request)
  898. # Only 2 requests should be processed.
  899. assert len(request.call_args_list) == 2
  900. # Verify token exchange request parameters.
  901. expected_cert_paths = ("path/to/cert.pem", "path/to/key.pem")
  902. self.assert_token_request_kwargs(
  903. request.call_args_list[0][1],
  904. token_headers,
  905. token_request_data,
  906. expected_cert_paths,
  907. )
  908. # Verify service account impersonation request parameters.
  909. self.assert_impersonation_request_kwargs(
  910. request.call_args_list[1][1],
  911. impersonation_headers,
  912. impersonation_request_data,
  913. expected_cert_paths,
  914. )
  915. assert credentials.valid
  916. assert credentials.expiry == expected_expiry
  917. assert not credentials.expired
  918. assert credentials.token == impersonation_response["accessToken"]
  919. @mock.patch(
  920. "google.auth.metrics.token_request_access_token_impersonate",
  921. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  922. )
  923. @mock.patch(
  924. "google.auth.metrics.python_and_auth_lib_version",
  925. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  926. )
  927. def test_refresh_workforce_impersonation_without_client_auth_success(
  928. self, mock_metrics_header_value, mock_auth_lib_value
  929. ):
  930. # Simulate service account access token expires in 2800 seconds.
  931. expire_time = (
  932. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  933. ).isoformat("T") + "Z"
  934. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  935. # STS token exchange request/response.
  936. token_response = self.SUCCESS_RESPONSE.copy()
  937. token_headers = {
  938. "Content-Type": "application/x-www-form-urlencoded",
  939. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  940. }
  941. token_request_data = {
  942. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  943. "audience": self.WORKFORCE_AUDIENCE,
  944. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  945. "subject_token": "subject_token_0",
  946. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  947. "scope": "https://www.googleapis.com/auth/iam",
  948. "options": urllib.parse.quote(
  949. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  950. ),
  951. }
  952. # Service account impersonation request/response.
  953. impersonation_response = {
  954. "accessToken": "SA_ACCESS_TOKEN",
  955. "expireTime": expire_time,
  956. }
  957. impersonation_headers = {
  958. "Content-Type": "application/json",
  959. "authorization": "Bearer {}".format(token_response["access_token"]),
  960. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  961. "x-allowed-locations": "0x0",
  962. }
  963. impersonation_request_data = {
  964. "delegates": None,
  965. "scope": self.SCOPES,
  966. "lifetime": "3600s",
  967. }
  968. # Initialize mock request to handle token exchange and service account
  969. # impersonation request.
  970. request = self.make_mock_request(
  971. status=http_client.OK,
  972. data=token_response,
  973. impersonation_status=http_client.OK,
  974. impersonation_data=impersonation_response,
  975. )
  976. # Initialize credentials with service account impersonation.
  977. credentials = self.make_workforce_pool_credentials(
  978. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  979. scopes=self.SCOPES,
  980. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  981. )
  982. credentials.refresh(request)
  983. # Only 2 requests should be processed.
  984. assert len(request.call_args_list) == 2
  985. # Verify token exchange request parameters.
  986. self.assert_token_request_kwargs(
  987. request.call_args_list[0][1], token_headers, token_request_data
  988. )
  989. # Verify service account impersonation request parameters.
  990. self.assert_impersonation_request_kwargs(
  991. request.call_args_list[1][1],
  992. impersonation_headers,
  993. impersonation_request_data,
  994. )
  995. assert credentials.valid
  996. assert credentials.expiry == expected_expiry
  997. assert not credentials.expired
  998. assert credentials.token == impersonation_response["accessToken"]
  999. @mock.patch(
  1000. "google.auth.metrics.python_and_auth_lib_version",
  1001. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1002. )
  1003. def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
  1004. self, mock_auth_lib_value
  1005. ):
  1006. headers = {
  1007. "Content-Type": "application/x-www-form-urlencoded",
  1008. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1009. }
  1010. request_data = {
  1011. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1012. "audience": self.AUDIENCE,
  1013. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1014. "scope": "scope1 scope2",
  1015. "subject_token": "subject_token_0",
  1016. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1017. }
  1018. request = self.make_mock_request(
  1019. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1020. )
  1021. credentials = self.make_credentials(
  1022. scopes=["scope1", "scope2"],
  1023. # Default scopes will be ignored in favor of user scopes.
  1024. default_scopes=["ignored"],
  1025. )
  1026. credentials.refresh(request)
  1027. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  1028. assert credentials.valid
  1029. assert not credentials.expired
  1030. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1031. assert credentials.has_scopes(["scope1", "scope2"])
  1032. assert not credentials.has_scopes(["ignored"])
  1033. @mock.patch(
  1034. "google.auth.metrics.python_and_auth_lib_version",
  1035. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1036. )
  1037. def test_refresh_without_client_auth_success_explicit_default_scopes_only(
  1038. self, mock_auth_lib_value
  1039. ):
  1040. headers = {
  1041. "Content-Type": "application/x-www-form-urlencoded",
  1042. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1043. }
  1044. request_data = {
  1045. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1046. "audience": self.AUDIENCE,
  1047. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1048. "scope": "scope1 scope2",
  1049. "subject_token": "subject_token_0",
  1050. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1051. }
  1052. request = self.make_mock_request(
  1053. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1054. )
  1055. credentials = self.make_credentials(
  1056. scopes=None,
  1057. # Default scopes will be used since user scopes are none.
  1058. default_scopes=["scope1", "scope2"],
  1059. )
  1060. credentials.refresh(request)
  1061. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  1062. assert credentials.valid
  1063. assert not credentials.expired
  1064. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1065. assert credentials.has_scopes(["scope1", "scope2"])
  1066. def test_refresh_without_client_auth_error(self):
  1067. request = self.make_mock_request(
  1068. status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
  1069. )
  1070. credentials = self.make_credentials()
  1071. with pytest.raises(exceptions.OAuthError) as excinfo:
  1072. credentials.refresh(request)
  1073. assert excinfo.match(
  1074. r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
  1075. )
  1076. assert not credentials.expired
  1077. assert credentials.token is None
  1078. def test_refresh_impersonation_without_client_auth_error(self):
  1079. request = self.make_mock_request(
  1080. status=http_client.OK,
  1081. data=self.SUCCESS_RESPONSE,
  1082. impersonation_status=http_client.BAD_REQUEST,
  1083. impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
  1084. )
  1085. credentials = self.make_credentials(
  1086. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1087. scopes=self.SCOPES,
  1088. )
  1089. with pytest.raises(exceptions.RefreshError) as excinfo:
  1090. credentials.refresh(request)
  1091. assert excinfo.match(r"Unable to acquire impersonated credentials")
  1092. assert not credentials.expired
  1093. assert credentials.token is None
  1094. def test_refresh_impersonation_invalid_impersonated_url_error(self):
  1095. credentials = self.make_credentials(
  1096. service_account_impersonation_url="https://iamcredentials.googleapis.com/v1/invalid",
  1097. scopes=self.SCOPES,
  1098. )
  1099. with pytest.raises(exceptions.RefreshError) as excinfo:
  1100. credentials.refresh(None)
  1101. assert excinfo.match(
  1102. r"Unable to determine target principal from service account impersonation URL."
  1103. )
  1104. assert not credentials.expired
  1105. assert credentials.token is None
  1106. @mock.patch(
  1107. "google.auth.metrics.python_and_auth_lib_version",
  1108. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1109. )
  1110. def test_refresh_with_client_auth_success(self, mock_auth_lib_value):
  1111. headers = {
  1112. "Content-Type": "application/x-www-form-urlencoded",
  1113. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1114. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1115. }
  1116. request_data = {
  1117. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1118. "audience": self.AUDIENCE,
  1119. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1120. "subject_token": "subject_token_0",
  1121. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1122. }
  1123. request = self.make_mock_request(
  1124. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1125. )
  1126. credentials = self.make_credentials(
  1127. client_id=CLIENT_ID, client_secret=CLIENT_SECRET
  1128. )
  1129. credentials.refresh(request)
  1130. self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
  1131. assert credentials.valid
  1132. assert not credentials.expired
  1133. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1134. @mock.patch(
  1135. "google.auth.metrics.token_request_access_token_impersonate",
  1136. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1137. )
  1138. @mock.patch(
  1139. "google.auth.metrics.python_and_auth_lib_version",
  1140. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1141. )
  1142. def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(
  1143. self, mock_metrics_header_value, mock_auth_lib_value
  1144. ):
  1145. # Simulate service account access token expires in 2800 seconds.
  1146. expire_time = (
  1147. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1148. ).isoformat("T") + "Z"
  1149. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1150. # STS token exchange request/response.
  1151. token_response = self.SUCCESS_RESPONSE.copy()
  1152. token_headers = {
  1153. "Content-Type": "application/x-www-form-urlencoded",
  1154. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1155. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1156. }
  1157. token_request_data = {
  1158. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1159. "audience": self.AUDIENCE,
  1160. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1161. "subject_token": "subject_token_0",
  1162. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1163. "scope": "https://www.googleapis.com/auth/iam",
  1164. }
  1165. # Service account impersonation request/response.
  1166. impersonation_response = {
  1167. "accessToken": "SA_ACCESS_TOKEN",
  1168. "expireTime": expire_time,
  1169. }
  1170. impersonation_headers = {
  1171. "Content-Type": "application/json",
  1172. "authorization": "Bearer {}".format(token_response["access_token"]),
  1173. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1174. "x-allowed-locations": "0x0",
  1175. }
  1176. impersonation_request_data = {
  1177. "delegates": None,
  1178. "scope": self.SCOPES,
  1179. "lifetime": "3600s",
  1180. }
  1181. # Initialize mock request to handle token exchange and service account
  1182. # impersonation request.
  1183. request = self.make_mock_request(
  1184. status=http_client.OK,
  1185. data=token_response,
  1186. impersonation_status=http_client.OK,
  1187. impersonation_data=impersonation_response,
  1188. )
  1189. # Initialize credentials with service account impersonation and basic auth.
  1190. credentials = self.make_credentials(
  1191. client_id=CLIENT_ID,
  1192. client_secret=CLIENT_SECRET,
  1193. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1194. scopes=self.SCOPES,
  1195. # Default scopes will be ignored since user scopes are specified.
  1196. default_scopes=["ignored"],
  1197. )
  1198. credentials.refresh(request)
  1199. # Only 2 requests should be processed.
  1200. assert len(request.call_args_list) == 2
  1201. # Verify token exchange request parameters.
  1202. self.assert_token_request_kwargs(
  1203. request.call_args_list[0][1], token_headers, token_request_data
  1204. )
  1205. # Verify service account impersonation request parameters.
  1206. self.assert_impersonation_request_kwargs(
  1207. request.call_args_list[1][1],
  1208. impersonation_headers,
  1209. impersonation_request_data,
  1210. )
  1211. assert credentials.valid
  1212. assert credentials.expiry == expected_expiry
  1213. assert not credentials.expired
  1214. assert credentials.token == impersonation_response["accessToken"]
  1215. @mock.patch(
  1216. "google.auth.metrics.token_request_access_token_impersonate",
  1217. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1218. )
  1219. @mock.patch(
  1220. "google.auth.metrics.python_and_auth_lib_version",
  1221. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1222. )
  1223. def test_refresh_impersonation_with_client_auth_success_use_default_scopes(
  1224. self, mock_metrics_header_value, mock_auth_lib_value
  1225. ):
  1226. # Simulate service account access token expires in 2800 seconds.
  1227. expire_time = (
  1228. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1229. ).isoformat("T") + "Z"
  1230. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1231. # STS token exchange request/response.
  1232. token_response = self.SUCCESS_RESPONSE.copy()
  1233. token_headers = {
  1234. "Content-Type": "application/x-www-form-urlencoded",
  1235. "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
  1236. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1237. }
  1238. token_request_data = {
  1239. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1240. "audience": self.AUDIENCE,
  1241. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1242. "subject_token": "subject_token_0",
  1243. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1244. "scope": "https://www.googleapis.com/auth/iam",
  1245. }
  1246. # Service account impersonation request/response.
  1247. impersonation_response = {
  1248. "accessToken": "SA_ACCESS_TOKEN",
  1249. "expireTime": expire_time,
  1250. }
  1251. impersonation_headers = {
  1252. "Content-Type": "application/json",
  1253. "authorization": "Bearer {}".format(token_response["access_token"]),
  1254. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1255. "x-allowed-locations": "0x0",
  1256. }
  1257. impersonation_request_data = {
  1258. "delegates": None,
  1259. "scope": self.SCOPES,
  1260. "lifetime": "3600s",
  1261. }
  1262. # Initialize mock request to handle token exchange and service account
  1263. # impersonation request.
  1264. request = self.make_mock_request(
  1265. status=http_client.OK,
  1266. data=token_response,
  1267. impersonation_status=http_client.OK,
  1268. impersonation_data=impersonation_response,
  1269. )
  1270. # Initialize credentials with service account impersonation and basic auth.
  1271. credentials = self.make_credentials(
  1272. client_id=CLIENT_ID,
  1273. client_secret=CLIENT_SECRET,
  1274. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1275. scopes=None,
  1276. # Default scopes will be used since user specified scopes are none.
  1277. default_scopes=self.SCOPES,
  1278. )
  1279. credentials.refresh(request)
  1280. # Only 2 requests should be processed.
  1281. assert len(request.call_args_list) == 2
  1282. # Verify token exchange request parameters.
  1283. self.assert_token_request_kwargs(
  1284. request.call_args_list[0][1], token_headers, token_request_data
  1285. )
  1286. # Verify service account impersonation request parameters.
  1287. self.assert_impersonation_request_kwargs(
  1288. request.call_args_list[1][1],
  1289. impersonation_headers,
  1290. impersonation_request_data,
  1291. )
  1292. assert credentials.valid
  1293. assert credentials.expiry == expected_expiry
  1294. assert not credentials.expired
  1295. assert credentials.token == impersonation_response["accessToken"]
  1296. def test_apply_without_quota_project_id(self):
  1297. headers = {}
  1298. request = self.make_mock_request(
  1299. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1300. )
  1301. credentials = self.make_credentials()
  1302. credentials.refresh(request)
  1303. credentials.apply(headers)
  1304. assert headers == {
  1305. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1306. "x-allowed-locations": "0x0",
  1307. }
  1308. def test_apply_workforce_without_quota_project_id(self):
  1309. headers = {}
  1310. request = self.make_mock_request(
  1311. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1312. )
  1313. credentials = self.make_workforce_pool_credentials(
  1314. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  1315. )
  1316. credentials.refresh(request)
  1317. credentials.apply(headers)
  1318. assert headers == {
  1319. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1320. "x-allowed-locations": "0x0",
  1321. }
  1322. def test_apply_impersonation_without_quota_project_id(self):
  1323. expire_time = (
  1324. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1325. ).isoformat("T") + "Z"
  1326. # Service account impersonation response.
  1327. impersonation_response = {
  1328. "accessToken": "SA_ACCESS_TOKEN",
  1329. "expireTime": expire_time,
  1330. }
  1331. # Initialize mock request to handle token exchange and service account
  1332. # impersonation request.
  1333. request = self.make_mock_request(
  1334. status=http_client.OK,
  1335. data=self.SUCCESS_RESPONSE.copy(),
  1336. impersonation_status=http_client.OK,
  1337. impersonation_data=impersonation_response,
  1338. )
  1339. # Initialize credentials with service account impersonation.
  1340. credentials = self.make_credentials(
  1341. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1342. scopes=self.SCOPES,
  1343. )
  1344. headers = {}
  1345. credentials.refresh(request)
  1346. credentials.apply(headers)
  1347. assert headers == {
  1348. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1349. "x-allowed-locations": "0x0",
  1350. }
  1351. def test_apply_with_quota_project_id(self):
  1352. headers = {"other": "header-value"}
  1353. request = self.make_mock_request(
  1354. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1355. )
  1356. credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
  1357. credentials.refresh(request)
  1358. credentials.apply(headers)
  1359. assert headers == {
  1360. "other": "header-value",
  1361. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1362. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1363. "x-allowed-locations": "0x0",
  1364. }
  1365. def test_apply_impersonation_with_quota_project_id(self):
  1366. expire_time = (
  1367. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1368. ).isoformat("T") + "Z"
  1369. # Service account impersonation response.
  1370. impersonation_response = {
  1371. "accessToken": "SA_ACCESS_TOKEN",
  1372. "expireTime": expire_time,
  1373. }
  1374. # Initialize mock request to handle token exchange and service account
  1375. # impersonation request.
  1376. request = self.make_mock_request(
  1377. status=http_client.OK,
  1378. data=self.SUCCESS_RESPONSE.copy(),
  1379. impersonation_status=http_client.OK,
  1380. impersonation_data=impersonation_response,
  1381. )
  1382. # Initialize credentials with service account impersonation.
  1383. credentials = self.make_credentials(
  1384. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1385. scopes=self.SCOPES,
  1386. quota_project_id=self.QUOTA_PROJECT_ID,
  1387. )
  1388. headers = {"other": "header-value"}
  1389. credentials.refresh(request)
  1390. credentials.apply(headers)
  1391. assert headers == {
  1392. "other": "header-value",
  1393. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1394. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1395. "x-allowed-locations": "0x0",
  1396. }
  1397. def test_before_request(self):
  1398. headers = {"other": "header-value"}
  1399. request = self.make_mock_request(
  1400. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1401. )
  1402. credentials = self.make_credentials()
  1403. # First call should call refresh, setting the token.
  1404. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1405. assert headers == {
  1406. "other": "header-value",
  1407. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1408. "x-allowed-locations": "0x0",
  1409. }
  1410. # Second call shouldn't call refresh.
  1411. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1412. assert headers == {
  1413. "other": "header-value",
  1414. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1415. "x-allowed-locations": "0x0",
  1416. }
  1417. def test_before_request_workforce(self):
  1418. headers = {"other": "header-value"}
  1419. request = self.make_mock_request(
  1420. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1421. )
  1422. credentials = self.make_workforce_pool_credentials(
  1423. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
  1424. )
  1425. # First call should call refresh, setting the token.
  1426. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1427. assert headers == {
  1428. "other": "header-value",
  1429. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1430. "x-allowed-locations": "0x0",
  1431. }
  1432. # Second call shouldn't call refresh.
  1433. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1434. assert headers == {
  1435. "other": "header-value",
  1436. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1437. "x-allowed-locations": "0x0",
  1438. }
  1439. def test_before_request_impersonation(self):
  1440. expire_time = (
  1441. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1442. ).isoformat("T") + "Z"
  1443. # Service account impersonation response.
  1444. impersonation_response = {
  1445. "accessToken": "SA_ACCESS_TOKEN",
  1446. "expireTime": expire_time,
  1447. }
  1448. # Initialize mock request to handle token exchange and service account
  1449. # impersonation request.
  1450. request = self.make_mock_request(
  1451. status=http_client.OK,
  1452. data=self.SUCCESS_RESPONSE.copy(),
  1453. impersonation_status=http_client.OK,
  1454. impersonation_data=impersonation_response,
  1455. )
  1456. headers = {"other": "header-value"}
  1457. credentials = self.make_credentials(
  1458. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  1459. )
  1460. # First call should call refresh, setting the token.
  1461. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1462. assert headers == {
  1463. "other": "header-value",
  1464. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1465. "x-allowed-locations": "0x0",
  1466. }
  1467. # Second call shouldn't call refresh.
  1468. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1469. assert headers == {
  1470. "other": "header-value",
  1471. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1472. "x-allowed-locations": "0x0",
  1473. }
  1474. @mock.patch("google.auth._helpers.utcnow")
  1475. def test_before_request_expired(self, utcnow):
  1476. headers = {}
  1477. request = self.make_mock_request(
  1478. status=http_client.OK, data=self.SUCCESS_RESPONSE
  1479. )
  1480. credentials = self.make_credentials()
  1481. credentials.token = "token"
  1482. utcnow.return_value = datetime.datetime.min
  1483. # Set the expiration to one second more than now plus the clock skew
  1484. # accomodation. These credentials should be valid.
  1485. credentials.expiry = (
  1486. datetime.datetime.min
  1487. + _helpers.REFRESH_THRESHOLD
  1488. + datetime.timedelta(seconds=1)
  1489. )
  1490. assert credentials.valid
  1491. assert not credentials.expired
  1492. assert credentials.token_state == TokenState.FRESH
  1493. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1494. # Cached token should be used.
  1495. assert headers == {
  1496. "authorization": "Bearer token",
  1497. "x-allowed-locations": "0x0",
  1498. }
  1499. # Next call should simulate 1 second passed.
  1500. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  1501. assert not credentials.valid
  1502. assert credentials.expired
  1503. assert credentials.token_state == TokenState.STALE
  1504. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1505. assert credentials.token_state == TokenState.FRESH
  1506. # New token should be retrieved.
  1507. assert headers == {
  1508. "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
  1509. "x-allowed-locations": "0x0",
  1510. }
  1511. @mock.patch("google.auth._helpers.utcnow")
  1512. def test_before_request_impersonation_expired(self, utcnow):
  1513. headers = {}
  1514. expire_time = (
  1515. datetime.datetime.min + datetime.timedelta(seconds=3601)
  1516. ).isoformat("T") + "Z"
  1517. # Service account impersonation response.
  1518. impersonation_response = {
  1519. "accessToken": "SA_ACCESS_TOKEN",
  1520. "expireTime": expire_time,
  1521. }
  1522. # Initialize mock request to handle token exchange and service account
  1523. # impersonation request.
  1524. request = self.make_mock_request(
  1525. status=http_client.OK,
  1526. data=self.SUCCESS_RESPONSE.copy(),
  1527. impersonation_status=http_client.OK,
  1528. impersonation_data=impersonation_response,
  1529. )
  1530. credentials = self.make_credentials(
  1531. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
  1532. )
  1533. credentials.token = "token"
  1534. utcnow.return_value = datetime.datetime.min
  1535. # Set the expiration to one second more than now plus the clock skew
  1536. # accomodation. These credentials should be valid.
  1537. credentials.expiry = (
  1538. datetime.datetime.min
  1539. + _helpers.REFRESH_THRESHOLD
  1540. + datetime.timedelta(seconds=1)
  1541. )
  1542. assert credentials.valid
  1543. assert not credentials.expired
  1544. assert credentials.token_state == TokenState.FRESH
  1545. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1546. assert credentials.token_state == TokenState.FRESH
  1547. # Cached token should be used.
  1548. assert headers == {
  1549. "authorization": "Bearer token",
  1550. "x-allowed-locations": "0x0",
  1551. }
  1552. # Next call should simulate 1 second passed. This will trigger the expiration
  1553. # threshold.
  1554. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  1555. assert not credentials.valid
  1556. assert credentials.expired
  1557. assert credentials.token_state == TokenState.STALE
  1558. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1559. assert credentials.token_state == TokenState.FRESH
  1560. credentials.before_request(request, "POST", "https://example.com/api", headers)
  1561. # New token should be retrieved.
  1562. assert headers == {
  1563. "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
  1564. "x-allowed-locations": "0x0",
  1565. }
  1566. @pytest.mark.parametrize(
  1567. "audience",
  1568. [
  1569. # Legacy K8s audience format.
  1570. "identitynamespace:1f12345:my_provider",
  1571. # Unrealistic audiences.
  1572. "//iam.googleapis.com/projects",
  1573. "//iam.googleapis.com/projects/",
  1574. "//iam.googleapis.com/project/123456",
  1575. "//iam.googleapis.com/projects//123456",
  1576. "//iam.googleapis.com/prefix_projects/123456",
  1577. "//iam.googleapis.com/projects_suffix/123456",
  1578. ],
  1579. )
  1580. def test_project_number_indeterminable(self, audience):
  1581. credentials = CredentialsImpl(
  1582. audience=audience,
  1583. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1584. token_url=self.TOKEN_URL,
  1585. credential_source=self.CREDENTIAL_SOURCE,
  1586. )
  1587. assert credentials.project_number is None
  1588. assert credentials.get_project_id(None) is None
  1589. def test_project_number_determinable(self):
  1590. credentials = CredentialsImpl(
  1591. audience=self.AUDIENCE,
  1592. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1593. token_url=self.TOKEN_URL,
  1594. credential_source=self.CREDENTIAL_SOURCE,
  1595. )
  1596. assert credentials.project_number == self.PROJECT_NUMBER
  1597. def test_project_number_workforce(self):
  1598. credentials = CredentialsImpl(
  1599. audience=self.WORKFORCE_AUDIENCE,
  1600. subject_token_type=self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  1601. token_url=self.TOKEN_URL,
  1602. credential_source=self.CREDENTIAL_SOURCE,
  1603. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1604. )
  1605. assert credentials.project_number is None
  1606. def test_project_id_without_scopes(self):
  1607. # Initialize credentials with no scopes.
  1608. credentials = CredentialsImpl(
  1609. audience=self.AUDIENCE,
  1610. subject_token_type=self.SUBJECT_TOKEN_TYPE,
  1611. token_url=self.TOKEN_URL,
  1612. credential_source=self.CREDENTIAL_SOURCE,
  1613. )
  1614. assert credentials.get_project_id(None) is None
  1615. @mock.patch(
  1616. "google.auth.metrics.token_request_access_token_impersonate",
  1617. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1618. )
  1619. @mock.patch(
  1620. "google.auth.metrics.python_and_auth_lib_version",
  1621. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1622. )
  1623. def test_get_project_id_cloud_resource_manager_success(
  1624. self, mock_metrics_header_value, mock_auth_lib_value
  1625. ):
  1626. # STS token exchange request/response.
  1627. token_response = self.SUCCESS_RESPONSE.copy()
  1628. token_headers = {
  1629. "Content-Type": "application/x-www-form-urlencoded",
  1630. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false",
  1631. }
  1632. token_request_data = {
  1633. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1634. "audience": self.AUDIENCE,
  1635. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1636. "subject_token": "subject_token_0",
  1637. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1638. "scope": "https://www.googleapis.com/auth/iam",
  1639. }
  1640. # Service account impersonation request/response.
  1641. expire_time = (
  1642. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
  1643. ).isoformat("T") + "Z"
  1644. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1645. impersonation_response = {
  1646. "accessToken": "SA_ACCESS_TOKEN",
  1647. "expireTime": expire_time,
  1648. }
  1649. impersonation_headers = {
  1650. "Content-Type": "application/json",
  1651. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1652. "authorization": "Bearer {}".format(token_response["access_token"]),
  1653. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1654. "x-allowed-locations": "0x0",
  1655. }
  1656. impersonation_request_data = {
  1657. "delegates": None,
  1658. "scope": self.SCOPES,
  1659. "lifetime": "3600s",
  1660. }
  1661. # Initialize mock request to handle token exchange, service account
  1662. # impersonation and cloud resource manager request.
  1663. request = self.make_mock_request(
  1664. status=http_client.OK,
  1665. data=self.SUCCESS_RESPONSE.copy(),
  1666. impersonation_status=http_client.OK,
  1667. impersonation_data=impersonation_response,
  1668. cloud_resource_manager_status=http_client.OK,
  1669. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1670. )
  1671. credentials = self.make_credentials(
  1672. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1673. scopes=self.SCOPES,
  1674. quota_project_id=self.QUOTA_PROJECT_ID,
  1675. )
  1676. # Expected project ID from cloud resource manager response should be returned.
  1677. project_id = credentials.get_project_id(request)
  1678. assert project_id == self.PROJECT_ID
  1679. # 3 requests should be processed.
  1680. assert len(request.call_args_list) == 3
  1681. # Verify token exchange request parameters.
  1682. self.assert_token_request_kwargs(
  1683. request.call_args_list[0][1], token_headers, token_request_data
  1684. )
  1685. # Verify service account impersonation request parameters.
  1686. self.assert_impersonation_request_kwargs(
  1687. request.call_args_list[1][1],
  1688. impersonation_headers,
  1689. impersonation_request_data,
  1690. )
  1691. # In the process of getting project ID, an access token should be
  1692. # retrieved.
  1693. assert credentials.valid
  1694. assert credentials.expiry == expected_expiry
  1695. assert not credentials.expired
  1696. assert credentials.token == impersonation_response["accessToken"]
  1697. # Verify cloud resource manager request parameters.
  1698. self.assert_resource_manager_request_kwargs(
  1699. request.call_args_list[2][1],
  1700. self.PROJECT_NUMBER,
  1701. {
  1702. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1703. "authorization": "Bearer {}".format(
  1704. impersonation_response["accessToken"]
  1705. ),
  1706. "x-allowed-locations": "0x0",
  1707. },
  1708. )
  1709. # Calling get_project_id again should return the cached project_id.
  1710. project_id = credentials.get_project_id(request)
  1711. assert project_id == self.PROJECT_ID
  1712. # No additional requests.
  1713. assert len(request.call_args_list) == 3
  1714. @mock.patch(
  1715. "google.auth.metrics.python_and_auth_lib_version",
  1716. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1717. )
  1718. def test_workforce_pool_get_project_id_cloud_resource_manager_success(
  1719. self, mock_auth_lib_value
  1720. ):
  1721. # STS token exchange request/response.
  1722. token_headers = {
  1723. "Content-Type": "application/x-www-form-urlencoded",
  1724. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false",
  1725. }
  1726. token_request_data = {
  1727. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1728. "audience": self.WORKFORCE_AUDIENCE,
  1729. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1730. "subject_token": "subject_token_0",
  1731. "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
  1732. "scope": "scope1 scope2",
  1733. "options": urllib.parse.quote(
  1734. json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
  1735. ),
  1736. }
  1737. # Initialize mock request to handle token exchange and cloud resource
  1738. # manager request.
  1739. request = self.make_mock_request(
  1740. status=http_client.OK,
  1741. data=self.SUCCESS_RESPONSE.copy(),
  1742. cloud_resource_manager_status=http_client.OK,
  1743. cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
  1744. )
  1745. credentials = self.make_workforce_pool_credentials(
  1746. scopes=self.SCOPES,
  1747. quota_project_id=self.QUOTA_PROJECT_ID,
  1748. workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
  1749. )
  1750. # Expected project ID from cloud resource manager response should be returned.
  1751. project_id = credentials.get_project_id(request)
  1752. assert project_id == self.PROJECT_ID
  1753. # 2 requests should be processed.
  1754. assert len(request.call_args_list) == 2
  1755. # Verify token exchange request parameters.
  1756. self.assert_token_request_kwargs(
  1757. request.call_args_list[0][1], token_headers, token_request_data
  1758. )
  1759. # In the process of getting project ID, an access token should be
  1760. # retrieved.
  1761. assert credentials.valid
  1762. assert not credentials.expired
  1763. assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
  1764. # Verify cloud resource manager request parameters.
  1765. self.assert_resource_manager_request_kwargs(
  1766. request.call_args_list[1][1],
  1767. self.WORKFORCE_POOL_USER_PROJECT,
  1768. {
  1769. "x-goog-user-project": self.QUOTA_PROJECT_ID,
  1770. "authorization": "Bearer {}".format(
  1771. self.SUCCESS_RESPONSE["access_token"]
  1772. ),
  1773. "x-allowed-locations": "0x0",
  1774. },
  1775. )
  1776. # Calling get_project_id again should return the cached project_id.
  1777. project_id = credentials.get_project_id(request)
  1778. assert project_id == self.PROJECT_ID
  1779. # No additional requests.
  1780. assert len(request.call_args_list) == 2
  1781. @mock.patch(
  1782. "google.auth.metrics.token_request_access_token_impersonate",
  1783. return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1784. )
  1785. @mock.patch(
  1786. "google.auth.metrics.python_and_auth_lib_version",
  1787. return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
  1788. )
  1789. def test_refresh_impersonation_with_lifetime(
  1790. self, mock_metrics_header_value, mock_auth_lib_value
  1791. ):
  1792. # Simulate service account access token expires in 2800 seconds.
  1793. expire_time = (
  1794. _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
  1795. ).isoformat("T") + "Z"
  1796. expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
  1797. # STS token exchange request/response.
  1798. token_response = self.SUCCESS_RESPONSE.copy()
  1799. token_headers = {
  1800. "Content-Type": "application/x-www-form-urlencoded",
  1801. "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/true",
  1802. }
  1803. token_request_data = {
  1804. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  1805. "audience": self.AUDIENCE,
  1806. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  1807. "subject_token": "subject_token_0",
  1808. "subject_token_type": self.SUBJECT_TOKEN_TYPE,
  1809. "scope": "https://www.googleapis.com/auth/iam",
  1810. }
  1811. # Service account impersonation request/response.
  1812. impersonation_response = {
  1813. "accessToken": "SA_ACCESS_TOKEN",
  1814. "expireTime": expire_time,
  1815. }
  1816. impersonation_headers = {
  1817. "Content-Type": "application/json",
  1818. "authorization": "Bearer {}".format(token_response["access_token"]),
  1819. "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  1820. "x-allowed-locations": "0x0",
  1821. }
  1822. impersonation_request_data = {
  1823. "delegates": None,
  1824. "scope": self.SCOPES,
  1825. "lifetime": "2800s",
  1826. }
  1827. # Initialize mock request to handle token exchange and service account
  1828. # impersonation request.
  1829. request = self.make_mock_request(
  1830. status=http_client.OK,
  1831. data=token_response,
  1832. impersonation_status=http_client.OK,
  1833. impersonation_data=impersonation_response,
  1834. )
  1835. # Initialize credentials with service account impersonation.
  1836. credentials = self.make_credentials(
  1837. service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
  1838. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  1839. scopes=self.SCOPES,
  1840. )
  1841. credentials.refresh(request)
  1842. # Only 2 requests should be processed.
  1843. assert len(request.call_args_list) == 2
  1844. # Verify token exchange request parameters.
  1845. self.assert_token_request_kwargs(
  1846. request.call_args_list[0][1], token_headers, token_request_data
  1847. )
  1848. # Verify service account impersonation request parameters.
  1849. self.assert_impersonation_request_kwargs(
  1850. request.call_args_list[1][1],
  1851. impersonation_headers,
  1852. impersonation_request_data,
  1853. )
  1854. assert credentials.valid
  1855. assert credentials.expiry == expected_expiry
  1856. assert not credentials.expired
  1857. assert credentials.token == impersonation_response["accessToken"]
  1858. def test_get_project_id_cloud_resource_manager_error(self):
  1859. # Simulate resource doesn't have sufficient permissions to access
  1860. # cloud resource manager.
  1861. request = self.make_mock_request(
  1862. status=http_client.OK,
  1863. data=self.SUCCESS_RESPONSE.copy(),
  1864. cloud_resource_manager_status=http_client.UNAUTHORIZED,
  1865. )
  1866. credentials = self.make_credentials(scopes=self.SCOPES)
  1867. project_id = credentials.get_project_id(request)
  1868. assert project_id is None
  1869. # Only 2 requests to STS and cloud resource manager should be sent.
  1870. assert len(request.call_args_list) == 2
  1871. def test_supplier_context():
  1872. context = external_account.SupplierContext("TestTokenType", "TestAudience")
  1873. assert context.subject_token_type == "TestTokenType"
  1874. assert context.audience == "TestAudience"