test_identity_pool.py 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import os
  18. import urllib
  19. import mock
  20. import pytest # type: ignore
  21. from google.auth import _helpers, external_account
  22. from google.auth import exceptions
  23. from google.auth import identity_pool
  24. from google.auth import metrics
  25. from google.auth import transport
  26. from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
  27. CLIENT_ID = "username"
  28. CLIENT_SECRET = "password"
  29. # Base64 encoding of "username:password".
  30. BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
  31. SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
  32. SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = (
  33. "https://us-east1-iamcredentials.googleapis.com"
  34. )
  35. SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
  36. SERVICE_ACCOUNT_EMAIL
  37. )
  38. SERVICE_ACCOUNT_IMPERSONATION_URL = (
  39. SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
  40. )
  41. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  42. SCOPES = ["scope1", "scope2"]
  43. import yatest.common as yc
  44. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
  45. SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
  46. SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
  47. SUBJECT_TOKEN_FIELD_NAME = "access_token"
  48. with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
  49. TEXT_FILE_SUBJECT_TOKEN = fh.read()
  50. with open(SUBJECT_TOKEN_JSON_FILE) as fh:
  51. JSON_FILE_CONTENT = json.load(fh)
  52. JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
  53. TOKEN_URL = "https://sts.googleapis.com/v1/token"
  54. TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
  55. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
  56. AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
  57. WORKFORCE_AUDIENCE = (
  58. "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
  59. )
  60. WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
  61. WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
  62. VALID_TOKEN_URLS = [
  63. "https://sts.googleapis.com",
  64. "https://us-east-1.sts.googleapis.com",
  65. "https://US-EAST-1.sts.googleapis.com",
  66. "https://sts.us-east-1.googleapis.com",
  67. "https://sts.US-WEST-1.googleapis.com",
  68. "https://us-east-1-sts.googleapis.com",
  69. "https://US-WEST-1-sts.googleapis.com",
  70. "https://us-west-1-sts.googleapis.com/path?query",
  71. "https://sts-us-east-1.p.googleapis.com",
  72. ]
  73. INVALID_TOKEN_URLS = [
  74. "https://iamcredentials.googleapis.com",
  75. "sts.googleapis.com",
  76. "https://",
  77. "http://sts.googleapis.com",
  78. "https://st.s.googleapis.com",
  79. "https://us-eas\t-1.sts.googleapis.com",
  80. "https:/us-east-1.sts.googleapis.com",
  81. "https://US-WE/ST-1-sts.googleapis.com",
  82. "https://sts-us-east-1.googleapis.com",
  83. "https://sts-US-WEST-1.googleapis.com",
  84. "testhttps://us-east-1.sts.googleapis.com",
  85. "https://us-east-1.sts.googleapis.comevil.com",
  86. "https://us-east-1.us-east-1.sts.googleapis.com",
  87. "https://us-ea.s.t.sts.googleapis.com",
  88. "https://sts.googleapis.comevil.com",
  89. "hhttps://us-east-1.sts.googleapis.com",
  90. "https://us- -1.sts.googleapis.com",
  91. "https://-sts.googleapis.com",
  92. "https://us-east-1.sts.googleapis.com.evil.com",
  93. "https://sts.pgoogleapis.com",
  94. "https://p.googleapis.com",
  95. "https://sts.p.com",
  96. "http://sts.p.googleapis.com",
  97. "https://xyz-sts.p.googleapis.com",
  98. "https://sts-xyz.123.p.googleapis.com",
  99. "https://sts-xyz.p1.googleapis.com",
  100. "https://sts-xyz.p.foo.com",
  101. "https://sts-xyz.p.foo.googleapis.com",
  102. ]
  103. VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
  104. "https://iamcredentials.googleapis.com",
  105. "https://us-east-1.iamcredentials.googleapis.com",
  106. "https://US-EAST-1.iamcredentials.googleapis.com",
  107. "https://iamcredentials.us-east-1.googleapis.com",
  108. "https://iamcredentials.US-WEST-1.googleapis.com",
  109. "https://us-east-1-iamcredentials.googleapis.com",
  110. "https://US-WEST-1-iamcredentials.googleapis.com",
  111. "https://us-west-1-iamcredentials.googleapis.com/path?query",
  112. "https://iamcredentials-us-east-1.p.googleapis.com",
  113. ]
  114. INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
  115. "https://sts.googleapis.com",
  116. "iamcredentials.googleapis.com",
  117. "https://",
  118. "http://iamcredentials.googleapis.com",
  119. "https://iamcre.dentials.googleapis.com",
  120. "https://us-eas\t-1.iamcredentials.googleapis.com",
  121. "https:/us-east-1.iamcredentials.googleapis.com",
  122. "https://US-WE/ST-1-iamcredentials.googleapis.com",
  123. "https://iamcredentials-us-east-1.googleapis.com",
  124. "https://iamcredentials-US-WEST-1.googleapis.com",
  125. "testhttps://us-east-1.iamcredentials.googleapis.com",
  126. "https://us-east-1.iamcredentials.googleapis.comevil.com",
  127. "https://us-east-1.us-east-1.iamcredentials.googleapis.com",
  128. "https://us-ea.s.t.iamcredentials.googleapis.com",
  129. "https://iamcredentials.googleapis.comevil.com",
  130. "hhttps://us-east-1.iamcredentials.googleapis.com",
  131. "https://us- -1.iamcredentials.googleapis.com",
  132. "https://-iamcredentials.googleapis.com",
  133. "https://us-east-1.iamcredentials.googleapis.com.evil.com",
  134. "https://iamcredentials.pgoogleapis.com",
  135. "https://p.googleapis.com",
  136. "https://iamcredentials.p.com",
  137. "http://iamcredentials.p.googleapis.com",
  138. "https://xyz-iamcredentials.p.googleapis.com",
  139. "https://iamcredentials-xyz.123.p.googleapis.com",
  140. "https://iamcredentials-xyz.p1.googleapis.com",
  141. "https://iamcredentials-xyz.p.foo.com",
  142. "https://iamcredentials-xyz.p.foo.googleapis.com",
  143. ]
  144. class TestSubjectTokenSupplier(identity_pool.SubjectTokenSupplier):
  145. def __init__(
  146. self, subject_token=None, subject_token_exception=None, expected_context=None
  147. ):
  148. self._subject_token = subject_token
  149. self._subject_token_exception = subject_token_exception
  150. self._expected_context = expected_context
  151. def get_subject_token(self, context, request):
  152. if self._expected_context is not None:
  153. assert self._expected_context == context
  154. if self._subject_token_exception is not None:
  155. raise self._subject_token_exception
  156. return self._subject_token
  157. class TestCredentials(object):
  158. CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
  159. CREDENTIAL_SOURCE_JSON = {
  160. "file": SUBJECT_TOKEN_JSON_FILE,
  161. "format": {"type": "json", "subject_token_field_name": "access_token"},
  162. }
  163. CREDENTIAL_URL = "http://fakeurl.com"
  164. CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
  165. CREDENTIAL_SOURCE_JSON_URL = {
  166. "url": CREDENTIAL_URL,
  167. "format": {"type": "json", "subject_token_field_name": "access_token"},
  168. }
  169. CREDENTIAL_SOURCE_CERTIFICATE = {
  170. "certificate": {"use_default_certificate_config": "true"}
  171. }
  172. CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT = {
  173. "certificate": {"certificate_config_location": "path/to/config"}
  174. }
  175. SUCCESS_RESPONSE = {
  176. "access_token": "ACCESS_TOKEN",
  177. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  178. "token_type": "Bearer",
  179. "expires_in": 3600,
  180. "scope": " ".join(SCOPES),
  181. }
  182. @classmethod
  183. def make_mock_response(cls, status, data):
  184. response = mock.create_autospec(transport.Response, instance=True)
  185. response.status = status
  186. if isinstance(data, dict):
  187. response.data = json.dumps(data).encode("utf-8")
  188. else:
  189. response.data = data
  190. return response
  191. @classmethod
  192. def make_mock_request(
  193. cls, token_status=http_client.OK, token_data=None, *extra_requests
  194. ):
  195. responses = []
  196. responses.append(cls.make_mock_response(token_status, token_data))
  197. while len(extra_requests) > 0:
  198. # If service account impersonation is requested, mock the expected response.
  199. status, data, extra_requests = (
  200. extra_requests[0],
  201. extra_requests[1],
  202. extra_requests[2:],
  203. )
  204. responses.append(cls.make_mock_response(status, data))
  205. request = mock.create_autospec(transport.Request)
  206. request.side_effect = responses
  207. return request
  208. @classmethod
  209. def assert_credential_request_kwargs(
  210. cls, request_kwargs, headers, url=CREDENTIAL_URL
  211. ):
  212. assert request_kwargs["url"] == url
  213. assert request_kwargs["method"] == "GET"
  214. assert request_kwargs["headers"] == headers
  215. assert request_kwargs.get("body", None) is None
  216. @classmethod
  217. def assert_token_request_kwargs(
  218. cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
  219. ):
  220. assert request_kwargs["url"] == token_url
  221. assert request_kwargs["method"] == "POST"
  222. assert request_kwargs["headers"] == headers
  223. assert request_kwargs["body"] is not None
  224. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  225. assert len(body_tuples) == len(request_data.keys())
  226. for (k, v) in body_tuples:
  227. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  228. @classmethod
  229. def assert_impersonation_request_kwargs(
  230. cls,
  231. request_kwargs,
  232. headers,
  233. request_data,
  234. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  235. ):
  236. assert request_kwargs["url"] == service_account_impersonation_url
  237. assert request_kwargs["method"] == "POST"
  238. assert request_kwargs["headers"] == headers
  239. assert request_kwargs["body"] is not None
  240. body_json = json.loads(request_kwargs["body"].decode("utf-8"))
  241. assert body_json == request_data
  242. @classmethod
  243. def assert_underlying_credentials_refresh(
  244. cls,
  245. credentials,
  246. audience,
  247. subject_token,
  248. subject_token_type,
  249. token_url,
  250. service_account_impersonation_url=None,
  251. basic_auth_encoding=None,
  252. quota_project_id=None,
  253. used_scopes=None,
  254. credential_data=None,
  255. scopes=None,
  256. default_scopes=None,
  257. workforce_pool_user_project=None,
  258. ):
  259. """Utility to assert that a credentials are initialized with the expected
  260. attributes by calling refresh functionality and confirming response matches
  261. expected one and that the underlying requests were populated with the
  262. expected parameters.
  263. """
  264. # STS token exchange request/response.
  265. token_response = cls.SUCCESS_RESPONSE.copy()
  266. token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
  267. if basic_auth_encoding:
  268. token_headers["Authorization"] = "Basic " + basic_auth_encoding
  269. metrics_options = {}
  270. if credentials._service_account_impersonation_url:
  271. metrics_options["sa-impersonation"] = "true"
  272. else:
  273. metrics_options["sa-impersonation"] = "false"
  274. metrics_options["config-lifetime"] = "false"
  275. if credentials._credential_source:
  276. if credentials._credential_source_file:
  277. metrics_options["source"] = "file"
  278. else:
  279. metrics_options["source"] = "url"
  280. else:
  281. metrics_options["source"] = "programmatic"
  282. token_headers["x-goog-api-client"] = metrics.byoid_metrics_header(
  283. metrics_options
  284. )
  285. if service_account_impersonation_url:
  286. token_scopes = "https://www.googleapis.com/auth/iam"
  287. else:
  288. token_scopes = " ".join(used_scopes or [])
  289. token_request_data = {
  290. "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
  291. "audience": audience,
  292. "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
  293. "scope": token_scopes,
  294. "subject_token": subject_token,
  295. "subject_token_type": subject_token_type,
  296. }
  297. if workforce_pool_user_project:
  298. token_request_data["options"] = urllib.parse.quote(
  299. json.dumps({"userProject": workforce_pool_user_project})
  300. )
  301. metrics_header_value = (
  302. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
  303. )
  304. if service_account_impersonation_url:
  305. # Service account impersonation request/response.
  306. expire_time = (
  307. _helpers.utcnow().replace(microsecond=0)
  308. + datetime.timedelta(seconds=3600)
  309. ).isoformat("T") + "Z"
  310. impersonation_response = {
  311. "accessToken": "SA_ACCESS_TOKEN",
  312. "expireTime": expire_time,
  313. }
  314. impersonation_headers = {
  315. "Content-Type": "application/json",
  316. "authorization": "Bearer {}".format(token_response["access_token"]),
  317. "x-goog-api-client": metrics_header_value,
  318. "x-allowed-locations": "0x0",
  319. }
  320. impersonation_request_data = {
  321. "delegates": None,
  322. "scope": used_scopes,
  323. "lifetime": "3600s",
  324. }
  325. # Initialize mock request to handle token retrieval, token exchange and
  326. # service account impersonation request.
  327. requests = []
  328. if credential_data:
  329. requests.append((http_client.OK, credential_data))
  330. token_request_index = len(requests)
  331. requests.append((http_client.OK, token_response))
  332. if service_account_impersonation_url:
  333. impersonation_request_index = len(requests)
  334. requests.append((http_client.OK, impersonation_response))
  335. request = cls.make_mock_request(*[el for req in requests for el in req])
  336. with mock.patch(
  337. "google.auth.metrics.token_request_access_token_impersonate",
  338. return_value=metrics_header_value,
  339. ):
  340. credentials.refresh(request)
  341. assert len(request.call_args_list) == len(requests)
  342. if credential_data:
  343. cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  344. # Verify token exchange request parameters.
  345. cls.assert_token_request_kwargs(
  346. request.call_args_list[token_request_index][1],
  347. token_headers,
  348. token_request_data,
  349. token_url,
  350. )
  351. # Verify service account impersonation request parameters if the request
  352. # is processed.
  353. if service_account_impersonation_url:
  354. cls.assert_impersonation_request_kwargs(
  355. request.call_args_list[impersonation_request_index][1],
  356. impersonation_headers,
  357. impersonation_request_data,
  358. service_account_impersonation_url,
  359. )
  360. assert credentials.token == impersonation_response["accessToken"]
  361. else:
  362. assert credentials.token == token_response["access_token"]
  363. assert credentials.quota_project_id == quota_project_id
  364. assert credentials.scopes == scopes
  365. assert credentials.default_scopes == default_scopes
  366. @classmethod
  367. def make_credentials(
  368. cls,
  369. audience=AUDIENCE,
  370. subject_token_type=SUBJECT_TOKEN_TYPE,
  371. token_url=TOKEN_URL,
  372. token_info_url=TOKEN_INFO_URL,
  373. client_id=None,
  374. client_secret=None,
  375. quota_project_id=None,
  376. scopes=None,
  377. default_scopes=None,
  378. service_account_impersonation_url=None,
  379. credential_source=None,
  380. subject_token_supplier=None,
  381. workforce_pool_user_project=None,
  382. ):
  383. return identity_pool.Credentials(
  384. audience=audience,
  385. subject_token_type=subject_token_type,
  386. token_url=token_url,
  387. token_info_url=token_info_url,
  388. service_account_impersonation_url=service_account_impersonation_url,
  389. credential_source=credential_source,
  390. subject_token_supplier=subject_token_supplier,
  391. client_id=client_id,
  392. client_secret=client_secret,
  393. quota_project_id=quota_project_id,
  394. scopes=scopes,
  395. default_scopes=default_scopes,
  396. workforce_pool_user_project=workforce_pool_user_project,
  397. )
  398. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  399. def test_from_info_full_options(self, mock_init):
  400. credentials = identity_pool.Credentials.from_info(
  401. {
  402. "audience": AUDIENCE,
  403. "subject_token_type": SUBJECT_TOKEN_TYPE,
  404. "token_url": TOKEN_URL,
  405. "token_info_url": TOKEN_INFO_URL,
  406. "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
  407. "service_account_impersonation": {"token_lifetime_seconds": 2800},
  408. "client_id": CLIENT_ID,
  409. "client_secret": CLIENT_SECRET,
  410. "quota_project_id": QUOTA_PROJECT_ID,
  411. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  412. }
  413. )
  414. # Confirm identity_pool.Credentials instantiated with expected attributes.
  415. assert isinstance(credentials, identity_pool.Credentials)
  416. mock_init.assert_called_once_with(
  417. audience=AUDIENCE,
  418. subject_token_type=SUBJECT_TOKEN_TYPE,
  419. token_url=TOKEN_URL,
  420. token_info_url=TOKEN_INFO_URL,
  421. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  422. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  423. client_id=CLIENT_ID,
  424. client_secret=CLIENT_SECRET,
  425. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  426. subject_token_supplier=None,
  427. quota_project_id=QUOTA_PROJECT_ID,
  428. workforce_pool_user_project=None,
  429. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  430. )
  431. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  432. def test_from_info_required_options_only(self, mock_init):
  433. credentials = identity_pool.Credentials.from_info(
  434. {
  435. "audience": AUDIENCE,
  436. "subject_token_type": SUBJECT_TOKEN_TYPE,
  437. "token_url": TOKEN_URL,
  438. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  439. }
  440. )
  441. # Confirm identity_pool.Credentials instantiated with expected attributes.
  442. assert isinstance(credentials, identity_pool.Credentials)
  443. mock_init.assert_called_once_with(
  444. audience=AUDIENCE,
  445. subject_token_type=SUBJECT_TOKEN_TYPE,
  446. token_url=TOKEN_URL,
  447. token_info_url=None,
  448. service_account_impersonation_url=None,
  449. service_account_impersonation_options={},
  450. client_id=None,
  451. client_secret=None,
  452. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  453. subject_token_supplier=None,
  454. quota_project_id=None,
  455. workforce_pool_user_project=None,
  456. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  457. )
  458. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  459. def test_from_info_supplier(self, mock_init):
  460. supplier = TestSubjectTokenSupplier()
  461. credentials = identity_pool.Credentials.from_info(
  462. {
  463. "audience": AUDIENCE,
  464. "subject_token_type": SUBJECT_TOKEN_TYPE,
  465. "token_url": TOKEN_URL,
  466. "subject_token_supplier": supplier,
  467. }
  468. )
  469. # Confirm identity_pool.Credentials instantiated with expected attributes.
  470. assert isinstance(credentials, identity_pool.Credentials)
  471. mock_init.assert_called_once_with(
  472. audience=AUDIENCE,
  473. subject_token_type=SUBJECT_TOKEN_TYPE,
  474. token_url=TOKEN_URL,
  475. token_info_url=None,
  476. service_account_impersonation_url=None,
  477. service_account_impersonation_options={},
  478. client_id=None,
  479. client_secret=None,
  480. credential_source=None,
  481. subject_token_supplier=supplier,
  482. quota_project_id=None,
  483. workforce_pool_user_project=None,
  484. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  485. )
  486. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  487. def test_from_info_workforce_pool(self, mock_init):
  488. credentials = identity_pool.Credentials.from_info(
  489. {
  490. "audience": WORKFORCE_AUDIENCE,
  491. "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
  492. "token_url": TOKEN_URL,
  493. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  494. "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
  495. }
  496. )
  497. # Confirm identity_pool.Credentials instantiated with expected attributes.
  498. assert isinstance(credentials, identity_pool.Credentials)
  499. mock_init.assert_called_once_with(
  500. audience=WORKFORCE_AUDIENCE,
  501. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  502. token_url=TOKEN_URL,
  503. token_info_url=None,
  504. service_account_impersonation_url=None,
  505. service_account_impersonation_options={},
  506. client_id=None,
  507. client_secret=None,
  508. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  509. subject_token_supplier=None,
  510. quota_project_id=None,
  511. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  512. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  513. )
  514. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  515. def test_from_file_full_options(self, mock_init, tmpdir):
  516. info = {
  517. "audience": AUDIENCE,
  518. "subject_token_type": SUBJECT_TOKEN_TYPE,
  519. "token_url": TOKEN_URL,
  520. "token_info_url": TOKEN_INFO_URL,
  521. "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
  522. "service_account_impersonation": {"token_lifetime_seconds": 2800},
  523. "client_id": CLIENT_ID,
  524. "client_secret": CLIENT_SECRET,
  525. "quota_project_id": QUOTA_PROJECT_ID,
  526. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  527. }
  528. config_file = tmpdir.join("config.json")
  529. config_file.write(json.dumps(info))
  530. credentials = identity_pool.Credentials.from_file(str(config_file))
  531. # Confirm identity_pool.Credentials instantiated with expected attributes.
  532. assert isinstance(credentials, identity_pool.Credentials)
  533. mock_init.assert_called_once_with(
  534. audience=AUDIENCE,
  535. subject_token_type=SUBJECT_TOKEN_TYPE,
  536. token_url=TOKEN_URL,
  537. token_info_url=TOKEN_INFO_URL,
  538. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  539. service_account_impersonation_options={"token_lifetime_seconds": 2800},
  540. client_id=CLIENT_ID,
  541. client_secret=CLIENT_SECRET,
  542. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  543. subject_token_supplier=None,
  544. quota_project_id=QUOTA_PROJECT_ID,
  545. workforce_pool_user_project=None,
  546. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  547. )
  548. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  549. def test_from_file_required_options_only(self, mock_init, tmpdir):
  550. info = {
  551. "audience": AUDIENCE,
  552. "subject_token_type": SUBJECT_TOKEN_TYPE,
  553. "token_url": TOKEN_URL,
  554. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  555. }
  556. config_file = tmpdir.join("config.json")
  557. config_file.write(json.dumps(info))
  558. credentials = identity_pool.Credentials.from_file(str(config_file))
  559. # Confirm identity_pool.Credentials instantiated with expected attributes.
  560. assert isinstance(credentials, identity_pool.Credentials)
  561. mock_init.assert_called_once_with(
  562. audience=AUDIENCE,
  563. subject_token_type=SUBJECT_TOKEN_TYPE,
  564. token_url=TOKEN_URL,
  565. token_info_url=None,
  566. service_account_impersonation_url=None,
  567. service_account_impersonation_options={},
  568. client_id=None,
  569. client_secret=None,
  570. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  571. subject_token_supplier=None,
  572. quota_project_id=None,
  573. workforce_pool_user_project=None,
  574. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  575. )
  576. @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
  577. def test_from_file_workforce_pool(self, mock_init, tmpdir):
  578. info = {
  579. "audience": WORKFORCE_AUDIENCE,
  580. "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
  581. "token_url": TOKEN_URL,
  582. "credential_source": self.CREDENTIAL_SOURCE_TEXT,
  583. "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
  584. }
  585. config_file = tmpdir.join("config.json")
  586. config_file.write(json.dumps(info))
  587. credentials = identity_pool.Credentials.from_file(str(config_file))
  588. # Confirm identity_pool.Credentials instantiated with expected attributes.
  589. assert isinstance(credentials, identity_pool.Credentials)
  590. mock_init.assert_called_once_with(
  591. audience=WORKFORCE_AUDIENCE,
  592. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  593. token_url=TOKEN_URL,
  594. token_info_url=None,
  595. service_account_impersonation_url=None,
  596. service_account_impersonation_options={},
  597. client_id=None,
  598. client_secret=None,
  599. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  600. subject_token_supplier=None,
  601. quota_project_id=None,
  602. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  603. universe_domain=DEFAULT_UNIVERSE_DOMAIN,
  604. )
  605. def test_constructor_nonworkforce_with_workforce_pool_user_project(self):
  606. with pytest.raises(ValueError) as excinfo:
  607. self.make_credentials(
  608. audience=AUDIENCE,
  609. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  610. )
  611. assert excinfo.match(
  612. "workforce_pool_user_project should not be set for non-workforce "
  613. "pool credentials"
  614. )
  615. def test_constructor_invalid_options(self):
  616. credential_source = {"unsupported": "value"}
  617. with pytest.raises(ValueError) as excinfo:
  618. self.make_credentials(credential_source=credential_source)
  619. assert excinfo.match(r"Missing credential_source")
  620. def test_constructor_invalid_options_url_and_file(self):
  621. credential_source = {
  622. "url": self.CREDENTIAL_URL,
  623. "file": SUBJECT_TOKEN_TEXT_FILE,
  624. }
  625. with pytest.raises(ValueError) as excinfo:
  626. self.make_credentials(credential_source=credential_source)
  627. assert excinfo.match(r"Ambiguous credential_source")
  628. def test_constructor_invalid_options_url_and_certificate(self):
  629. credential_source = {
  630. "url": self.CREDENTIAL_URL,
  631. "certificate": {"certificate": {"use_default_certificate_config": True}},
  632. }
  633. with pytest.raises(ValueError) as excinfo:
  634. self.make_credentials(credential_source=credential_source)
  635. assert excinfo.match(r"Ambiguous credential_source")
  636. def test_constructor_invalid_options_file_and_certificate(self):
  637. credential_source = {
  638. "file": SUBJECT_TOKEN_TEXT_FILE,
  639. "certificate": {"certificate": {"use_default_certificate": True}},
  640. }
  641. with pytest.raises(ValueError) as excinfo:
  642. self.make_credentials(credential_source=credential_source)
  643. assert excinfo.match(r"Ambiguous credential_source")
  644. def test_constructor_invalid_options_url_file_and_certificate(self):
  645. credential_source = {
  646. "file": SUBJECT_TOKEN_TEXT_FILE,
  647. "url": self.CREDENTIAL_URL,
  648. "certificate": {"certificate": {"use_default_certificate": True}},
  649. }
  650. with pytest.raises(ValueError) as excinfo:
  651. self.make_credentials(credential_source=credential_source)
  652. assert excinfo.match(r"Ambiguous credential_source")
  653. def test_constructor_invalid_options_environment_id(self):
  654. credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
  655. with pytest.raises(ValueError) as excinfo:
  656. self.make_credentials(credential_source=credential_source)
  657. assert excinfo.match(
  658. r"Invalid Identity Pool credential_source field 'environment_id'"
  659. )
  660. def test_constructor_invalid_credential_source(self):
  661. with pytest.raises(ValueError) as excinfo:
  662. self.make_credentials(credential_source="non-dict")
  663. assert excinfo.match(
  664. r"Invalid credential_source. The credential_source is not a dict."
  665. )
  666. def test_constructor_invalid_no_credential_source_or_supplier(self):
  667. with pytest.raises(ValueError) as excinfo:
  668. self.make_credentials()
  669. assert excinfo.match(
  670. r"A valid credential source or a subject token supplier must be provided."
  671. )
  672. def test_constructor_invalid_both_credential_source_and_supplier(self):
  673. supplier = TestSubjectTokenSupplier()
  674. with pytest.raises(ValueError) as excinfo:
  675. self.make_credentials(
  676. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  677. subject_token_supplier=supplier,
  678. )
  679. assert excinfo.match(
  680. r"Identity pool credential cannot have both a credential source and a subject token supplier."
  681. )
  682. def test_constructor_invalid_credential_source_format_type(self):
  683. credential_source = {"file": "test.txt", "format": {"type": "xml"}}
  684. with pytest.raises(ValueError) as excinfo:
  685. self.make_credentials(credential_source=credential_source)
  686. assert excinfo.match(r"Invalid credential_source format 'xml'")
  687. def test_constructor_missing_subject_token_field_name(self):
  688. credential_source = {"file": "test.txt", "format": {"type": "json"}}
  689. with pytest.raises(ValueError) as excinfo:
  690. self.make_credentials(credential_source=credential_source)
  691. assert excinfo.match(
  692. r"Missing subject_token_field_name for JSON credential_source format"
  693. )
  694. def test_constructor_default_and_file_location_certificate(self):
  695. credential_source = {
  696. "certificate": {
  697. "use_default_certificate_config": True,
  698. "certificate_config_location": "test",
  699. }
  700. }
  701. with pytest.raises(ValueError) as excinfo:
  702. self.make_credentials(credential_source=credential_source)
  703. assert excinfo.match(r"Invalid certificate configuration")
  704. def test_constructor_no_default_or_file_location_certificate(self):
  705. credential_source = {"certificate": {"use_default_certificate_config": False}}
  706. with pytest.raises(ValueError) as excinfo:
  707. self.make_credentials(credential_source=credential_source)
  708. assert excinfo.match(r"Invalid certificate configuration")
  709. def test_info_with_workforce_pool_user_project(self):
  710. credentials = self.make_credentials(
  711. audience=WORKFORCE_AUDIENCE,
  712. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  713. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
  714. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  715. )
  716. assert credentials.info == {
  717. "type": "external_account",
  718. "audience": WORKFORCE_AUDIENCE,
  719. "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
  720. "token_url": TOKEN_URL,
  721. "token_info_url": TOKEN_INFO_URL,
  722. "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
  723. "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
  724. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  725. }
  726. def test_info_with_file_credential_source(self):
  727. credentials = self.make_credentials(
  728. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
  729. )
  730. assert credentials.info == {
  731. "type": "external_account",
  732. "audience": AUDIENCE,
  733. "subject_token_type": SUBJECT_TOKEN_TYPE,
  734. "token_url": TOKEN_URL,
  735. "token_info_url": TOKEN_INFO_URL,
  736. "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
  737. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  738. }
  739. def test_info_with_url_credential_source(self):
  740. credentials = self.make_credentials(
  741. credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
  742. )
  743. assert credentials.info == {
  744. "type": "external_account",
  745. "audience": AUDIENCE,
  746. "subject_token_type": SUBJECT_TOKEN_TYPE,
  747. "token_url": TOKEN_URL,
  748. "token_info_url": TOKEN_INFO_URL,
  749. "credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
  750. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  751. }
  752. def test_info_with_certificate_credential_source(self):
  753. credentials = self.make_credentials(
  754. credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE.copy()
  755. )
  756. assert credentials.info == {
  757. "type": "external_account",
  758. "audience": AUDIENCE,
  759. "subject_token_type": SUBJECT_TOKEN_TYPE,
  760. "token_url": TOKEN_URL,
  761. "token_info_url": TOKEN_INFO_URL,
  762. "credential_source": self.CREDENTIAL_SOURCE_CERTIFICATE,
  763. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  764. }
  765. def test_info_with_non_default_certificate_credential_source(self):
  766. credentials = self.make_credentials(
  767. credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT.copy()
  768. )
  769. assert credentials.info == {
  770. "type": "external_account",
  771. "audience": AUDIENCE,
  772. "subject_token_type": SUBJECT_TOKEN_TYPE,
  773. "token_url": TOKEN_URL,
  774. "token_info_url": TOKEN_INFO_URL,
  775. "credential_source": self.CREDENTIAL_SOURCE_CERTIFICATE_NOT_DEFAULT,
  776. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  777. }
  778. def test_info_with_default_token_url(self):
  779. credentials = identity_pool.Credentials(
  780. audience=AUDIENCE,
  781. subject_token_type=SUBJECT_TOKEN_TYPE,
  782. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
  783. )
  784. assert credentials.info == {
  785. "type": "external_account",
  786. "audience": AUDIENCE,
  787. "subject_token_type": SUBJECT_TOKEN_TYPE,
  788. "token_url": TOKEN_URL,
  789. "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
  790. "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
  791. }
  792. def test_info_with_default_token_url_with_universe_domain(self):
  793. credentials = identity_pool.Credentials(
  794. audience=AUDIENCE,
  795. subject_token_type=SUBJECT_TOKEN_TYPE,
  796. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
  797. universe_domain="testdomain.org",
  798. )
  799. assert credentials.info == {
  800. "type": "external_account",
  801. "audience": AUDIENCE,
  802. "subject_token_type": SUBJECT_TOKEN_TYPE,
  803. "token_url": "https://sts.testdomain.org/v1/token",
  804. "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
  805. "universe_domain": "testdomain.org",
  806. }
  807. def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
  808. # Provide empty text file.
  809. empty_file = tmpdir.join("empty.txt")
  810. empty_file.write("")
  811. credential_source = {"file": str(empty_file)}
  812. credentials = self.make_credentials(credential_source=credential_source)
  813. with pytest.raises(exceptions.RefreshError) as excinfo:
  814. credentials.retrieve_subject_token(None)
  815. assert excinfo.match(r"Missing subject_token in the credential_source file")
  816. def test_retrieve_subject_token_text_file(self):
  817. credentials = self.make_credentials(
  818. credential_source=self.CREDENTIAL_SOURCE_TEXT
  819. )
  820. subject_token = credentials.retrieve_subject_token(None)
  821. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  822. def test_retrieve_subject_token_json_file(self):
  823. credentials = self.make_credentials(
  824. credential_source=self.CREDENTIAL_SOURCE_JSON
  825. )
  826. subject_token = credentials.retrieve_subject_token(None)
  827. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  828. def test_retrieve_subject_token_certificate(self):
  829. credentials = self.make_credentials(
  830. credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE
  831. )
  832. subject_token = credentials.retrieve_subject_token(None)
  833. assert subject_token == ""
  834. def test_retrieve_subject_token_json_file_invalid_field_name(self):
  835. credential_source = {
  836. "file": SUBJECT_TOKEN_JSON_FILE,
  837. "format": {"type": "json", "subject_token_field_name": "not_found"},
  838. }
  839. credentials = self.make_credentials(credential_source=credential_source)
  840. with pytest.raises(exceptions.RefreshError) as excinfo:
  841. credentials.retrieve_subject_token(None)
  842. assert excinfo.match(
  843. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  844. SUBJECT_TOKEN_JSON_FILE, "not_found"
  845. )
  846. )
  847. def test_retrieve_subject_token_invalid_json(self, tmpdir):
  848. # Provide JSON file. This should result in JSON parsing error.
  849. invalid_json_file = tmpdir.join("invalid.json")
  850. invalid_json_file.write("{")
  851. credential_source = {
  852. "file": str(invalid_json_file),
  853. "format": {"type": "json", "subject_token_field_name": "access_token"},
  854. }
  855. credentials = self.make_credentials(credential_source=credential_source)
  856. with pytest.raises(exceptions.RefreshError) as excinfo:
  857. credentials.retrieve_subject_token(None)
  858. assert excinfo.match(
  859. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  860. str(invalid_json_file), "access_token"
  861. )
  862. )
  863. def test_retrieve_subject_token_file_not_found(self):
  864. credential_source = {"file": "./not_found.txt"}
  865. credentials = self.make_credentials(credential_source=credential_source)
  866. with pytest.raises(exceptions.RefreshError) as excinfo:
  867. credentials.retrieve_subject_token(None)
  868. assert excinfo.match(r"File './not_found.txt' was not found")
  869. def test_token_info_url(self):
  870. credentials = self.make_credentials(
  871. credential_source=self.CREDENTIAL_SOURCE_JSON
  872. )
  873. assert credentials.token_info_url == TOKEN_INFO_URL
  874. def test_token_info_url_custom(self):
  875. for url in VALID_TOKEN_URLS:
  876. credentials = self.make_credentials(
  877. credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
  878. token_info_url=(url + "/introspect"),
  879. )
  880. assert credentials.token_info_url == url + "/introspect"
  881. def test_token_info_url_negative(self):
  882. credentials = self.make_credentials(
  883. credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), token_info_url=None
  884. )
  885. assert not credentials.token_info_url
  886. def test_token_url_custom(self):
  887. for url in VALID_TOKEN_URLS:
  888. credentials = self.make_credentials(
  889. credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
  890. token_url=(url + "/token"),
  891. )
  892. assert credentials._token_url == (url + "/token")
  893. def test_service_account_impersonation_url_custom(self):
  894. for url in VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS:
  895. credentials = self.make_credentials(
  896. credential_source=self.CREDENTIAL_SOURCE_JSON.copy(),
  897. service_account_impersonation_url=(
  898. url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
  899. ),
  900. )
  901. assert credentials._service_account_impersonation_url == (
  902. url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
  903. )
  904. def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
  905. self,
  906. ):
  907. credentials = self.make_credentials(
  908. client_id=CLIENT_ID,
  909. client_secret=CLIENT_SECRET,
  910. # Test with text format type.
  911. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  912. scopes=SCOPES,
  913. # Default scopes should be ignored.
  914. default_scopes=["ignored"],
  915. )
  916. self.assert_underlying_credentials_refresh(
  917. credentials=credentials,
  918. audience=AUDIENCE,
  919. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  920. subject_token_type=SUBJECT_TOKEN_TYPE,
  921. token_url=TOKEN_URL,
  922. service_account_impersonation_url=None,
  923. basic_auth_encoding=BASIC_AUTH_ENCODING,
  924. quota_project_id=None,
  925. used_scopes=SCOPES,
  926. scopes=SCOPES,
  927. default_scopes=["ignored"],
  928. )
  929. def test_refresh_workforce_success_with_client_auth_without_impersonation(self):
  930. credentials = self.make_credentials(
  931. audience=WORKFORCE_AUDIENCE,
  932. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  933. client_id=CLIENT_ID,
  934. client_secret=CLIENT_SECRET,
  935. # Test with text format type.
  936. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  937. scopes=SCOPES,
  938. # This will be ignored in favor of client auth.
  939. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  940. )
  941. self.assert_underlying_credentials_refresh(
  942. credentials=credentials,
  943. audience=WORKFORCE_AUDIENCE,
  944. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  945. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  946. token_url=TOKEN_URL,
  947. service_account_impersonation_url=None,
  948. basic_auth_encoding=BASIC_AUTH_ENCODING,
  949. quota_project_id=None,
  950. used_scopes=SCOPES,
  951. scopes=SCOPES,
  952. workforce_pool_user_project=None,
  953. )
  954. def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self):
  955. credentials = self.make_credentials(
  956. audience=WORKFORCE_AUDIENCE,
  957. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  958. client_id=CLIENT_ID,
  959. client_secret=CLIENT_SECRET,
  960. # Test with text format type.
  961. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  962. scopes=SCOPES,
  963. # This is not needed when client Auth is used.
  964. workforce_pool_user_project=None,
  965. )
  966. self.assert_underlying_credentials_refresh(
  967. credentials=credentials,
  968. audience=WORKFORCE_AUDIENCE,
  969. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  970. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  971. token_url=TOKEN_URL,
  972. service_account_impersonation_url=None,
  973. basic_auth_encoding=BASIC_AUTH_ENCODING,
  974. quota_project_id=None,
  975. used_scopes=SCOPES,
  976. scopes=SCOPES,
  977. workforce_pool_user_project=None,
  978. )
  979. def test_refresh_workforce_success_without_client_auth_without_impersonation(self):
  980. credentials = self.make_credentials(
  981. audience=WORKFORCE_AUDIENCE,
  982. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  983. client_id=None,
  984. client_secret=None,
  985. # Test with text format type.
  986. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  987. scopes=SCOPES,
  988. # This will not be ignored as client auth is not used.
  989. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  990. )
  991. self.assert_underlying_credentials_refresh(
  992. credentials=credentials,
  993. audience=WORKFORCE_AUDIENCE,
  994. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  995. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  996. token_url=TOKEN_URL,
  997. service_account_impersonation_url=None,
  998. basic_auth_encoding=None,
  999. quota_project_id=None,
  1000. used_scopes=SCOPES,
  1001. scopes=SCOPES,
  1002. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  1003. )
  1004. def test_refresh_workforce_success_without_client_auth_with_impersonation(self):
  1005. credentials = self.make_credentials(
  1006. audience=WORKFORCE_AUDIENCE,
  1007. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  1008. client_id=None,
  1009. client_secret=None,
  1010. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1011. # Test with text format type.
  1012. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  1013. scopes=SCOPES,
  1014. # This will not be ignored as client auth is not used.
  1015. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  1016. )
  1017. self.assert_underlying_credentials_refresh(
  1018. credentials=credentials,
  1019. audience=WORKFORCE_AUDIENCE,
  1020. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1021. subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
  1022. token_url=TOKEN_URL,
  1023. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1024. basic_auth_encoding=None,
  1025. quota_project_id=None,
  1026. used_scopes=SCOPES,
  1027. scopes=SCOPES,
  1028. workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
  1029. )
  1030. def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
  1031. credentials = self.make_credentials(
  1032. client_id=CLIENT_ID,
  1033. client_secret=CLIENT_SECRET,
  1034. # Test with text format type.
  1035. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  1036. scopes=None,
  1037. # Default scopes should be used since user specified scopes are none.
  1038. default_scopes=SCOPES,
  1039. )
  1040. self.assert_underlying_credentials_refresh(
  1041. credentials=credentials,
  1042. audience=AUDIENCE,
  1043. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1044. subject_token_type=SUBJECT_TOKEN_TYPE,
  1045. token_url=TOKEN_URL,
  1046. service_account_impersonation_url=None,
  1047. basic_auth_encoding=BASIC_AUTH_ENCODING,
  1048. quota_project_id=None,
  1049. used_scopes=SCOPES,
  1050. scopes=None,
  1051. default_scopes=SCOPES,
  1052. )
  1053. def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
  1054. # Initialize credentials with service account impersonation and basic auth.
  1055. credentials = self.make_credentials(
  1056. # Test with text format type.
  1057. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  1058. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1059. scopes=SCOPES,
  1060. # Default scopes should be ignored.
  1061. default_scopes=["ignored"],
  1062. )
  1063. self.assert_underlying_credentials_refresh(
  1064. credentials=credentials,
  1065. audience=AUDIENCE,
  1066. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1067. subject_token_type=SUBJECT_TOKEN_TYPE,
  1068. token_url=TOKEN_URL,
  1069. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1070. basic_auth_encoding=None,
  1071. quota_project_id=None,
  1072. used_scopes=SCOPES,
  1073. scopes=SCOPES,
  1074. default_scopes=["ignored"],
  1075. )
  1076. def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
  1077. # Initialize credentials with service account impersonation, basic auth
  1078. # and default scopes (no user scopes).
  1079. credentials = self.make_credentials(
  1080. # Test with text format type.
  1081. credential_source=self.CREDENTIAL_SOURCE_TEXT,
  1082. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1083. scopes=None,
  1084. # Default scopes should be used since user specified scopes are none.
  1085. default_scopes=SCOPES,
  1086. )
  1087. self.assert_underlying_credentials_refresh(
  1088. credentials=credentials,
  1089. audience=AUDIENCE,
  1090. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1091. subject_token_type=SUBJECT_TOKEN_TYPE,
  1092. token_url=TOKEN_URL,
  1093. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1094. basic_auth_encoding=None,
  1095. quota_project_id=None,
  1096. used_scopes=SCOPES,
  1097. scopes=None,
  1098. default_scopes=SCOPES,
  1099. )
  1100. def test_refresh_json_file_success_without_impersonation(self):
  1101. credentials = self.make_credentials(
  1102. client_id=CLIENT_ID,
  1103. client_secret=CLIENT_SECRET,
  1104. # Test with JSON format type.
  1105. credential_source=self.CREDENTIAL_SOURCE_JSON,
  1106. scopes=SCOPES,
  1107. )
  1108. self.assert_underlying_credentials_refresh(
  1109. credentials=credentials,
  1110. audience=AUDIENCE,
  1111. subject_token=JSON_FILE_SUBJECT_TOKEN,
  1112. subject_token_type=SUBJECT_TOKEN_TYPE,
  1113. token_url=TOKEN_URL,
  1114. service_account_impersonation_url=None,
  1115. basic_auth_encoding=BASIC_AUTH_ENCODING,
  1116. quota_project_id=None,
  1117. used_scopes=SCOPES,
  1118. scopes=SCOPES,
  1119. default_scopes=None,
  1120. )
  1121. def test_refresh_json_file_success_with_impersonation(self):
  1122. # Initialize credentials with service account impersonation and basic auth.
  1123. credentials = self.make_credentials(
  1124. # Test with JSON format type.
  1125. credential_source=self.CREDENTIAL_SOURCE_JSON,
  1126. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1127. scopes=SCOPES,
  1128. )
  1129. self.assert_underlying_credentials_refresh(
  1130. credentials=credentials,
  1131. audience=AUDIENCE,
  1132. subject_token=JSON_FILE_SUBJECT_TOKEN,
  1133. subject_token_type=SUBJECT_TOKEN_TYPE,
  1134. token_url=TOKEN_URL,
  1135. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1136. basic_auth_encoding=None,
  1137. quota_project_id=None,
  1138. used_scopes=SCOPES,
  1139. scopes=SCOPES,
  1140. default_scopes=None,
  1141. )
  1142. def test_refresh_with_retrieve_subject_token_error(self):
  1143. credential_source = {
  1144. "file": SUBJECT_TOKEN_JSON_FILE,
  1145. "format": {"type": "json", "subject_token_field_name": "not_found"},
  1146. }
  1147. credentials = self.make_credentials(credential_source=credential_source)
  1148. with pytest.raises(exceptions.RefreshError) as excinfo:
  1149. credentials.refresh(None)
  1150. assert excinfo.match(
  1151. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  1152. SUBJECT_TOKEN_JSON_FILE, "not_found"
  1153. )
  1154. )
  1155. def test_retrieve_subject_token_from_url(self):
  1156. credentials = self.make_credentials(
  1157. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
  1158. )
  1159. request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
  1160. subject_token = credentials.retrieve_subject_token(request)
  1161. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  1162. self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  1163. def test_retrieve_subject_token_from_url_with_headers(self):
  1164. credentials = self.make_credentials(
  1165. credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
  1166. )
  1167. request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
  1168. subject_token = credentials.retrieve_subject_token(request)
  1169. assert subject_token == TEXT_FILE_SUBJECT_TOKEN
  1170. self.assert_credential_request_kwargs(
  1171. request.call_args_list[0][1], {"foo": "bar"}
  1172. )
  1173. def test_retrieve_subject_token_from_url_json(self):
  1174. credentials = self.make_credentials(
  1175. credential_source=self.CREDENTIAL_SOURCE_JSON_URL
  1176. )
  1177. request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
  1178. subject_token = credentials.retrieve_subject_token(request)
  1179. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  1180. self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
  1181. def test_retrieve_subject_token_from_url_json_with_headers(self):
  1182. credentials = self.make_credentials(
  1183. credential_source={
  1184. "url": self.CREDENTIAL_URL,
  1185. "format": {"type": "json", "subject_token_field_name": "access_token"},
  1186. "headers": {"foo": "bar"},
  1187. }
  1188. )
  1189. request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
  1190. subject_token = credentials.retrieve_subject_token(request)
  1191. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  1192. self.assert_credential_request_kwargs(
  1193. request.call_args_list[0][1], {"foo": "bar"}
  1194. )
  1195. def test_retrieve_subject_token_from_url_not_found(self):
  1196. credentials = self.make_credentials(
  1197. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
  1198. )
  1199. with pytest.raises(exceptions.RefreshError) as excinfo:
  1200. credentials.retrieve_subject_token(
  1201. self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
  1202. )
  1203. assert excinfo.match("Unable to retrieve Identity Pool subject token")
  1204. def test_retrieve_subject_token_from_url_json_invalid_field(self):
  1205. credential_source = {
  1206. "url": self.CREDENTIAL_URL,
  1207. "format": {"type": "json", "subject_token_field_name": "not_found"},
  1208. }
  1209. credentials = self.make_credentials(credential_source=credential_source)
  1210. with pytest.raises(exceptions.RefreshError) as excinfo:
  1211. credentials.retrieve_subject_token(
  1212. self.make_mock_request(token_data=JSON_FILE_CONTENT)
  1213. )
  1214. assert excinfo.match(
  1215. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  1216. self.CREDENTIAL_URL, "not_found"
  1217. )
  1218. )
  1219. def test_retrieve_subject_token_from_url_json_invalid_format(self):
  1220. credentials = self.make_credentials(
  1221. credential_source=self.CREDENTIAL_SOURCE_JSON_URL
  1222. )
  1223. with pytest.raises(exceptions.RefreshError) as excinfo:
  1224. credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
  1225. assert excinfo.match(
  1226. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  1227. self.CREDENTIAL_URL, "access_token"
  1228. )
  1229. )
  1230. def test_refresh_text_file_success_without_impersonation_url(self):
  1231. credentials = self.make_credentials(
  1232. client_id=CLIENT_ID,
  1233. client_secret=CLIENT_SECRET,
  1234. # Test with text format type.
  1235. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
  1236. scopes=SCOPES,
  1237. )
  1238. self.assert_underlying_credentials_refresh(
  1239. credentials=credentials,
  1240. audience=AUDIENCE,
  1241. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1242. subject_token_type=SUBJECT_TOKEN_TYPE,
  1243. token_url=TOKEN_URL,
  1244. service_account_impersonation_url=None,
  1245. basic_auth_encoding=BASIC_AUTH_ENCODING,
  1246. quota_project_id=None,
  1247. used_scopes=SCOPES,
  1248. scopes=SCOPES,
  1249. default_scopes=None,
  1250. credential_data=TEXT_FILE_SUBJECT_TOKEN,
  1251. )
  1252. def test_refresh_text_file_success_with_impersonation_url(self):
  1253. # Initialize credentials with service account impersonation and basic auth.
  1254. credentials = self.make_credentials(
  1255. # Test with text format type.
  1256. credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
  1257. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1258. scopes=SCOPES,
  1259. )
  1260. self.assert_underlying_credentials_refresh(
  1261. credentials=credentials,
  1262. audience=AUDIENCE,
  1263. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1264. subject_token_type=SUBJECT_TOKEN_TYPE,
  1265. token_url=TOKEN_URL,
  1266. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1267. basic_auth_encoding=None,
  1268. quota_project_id=None,
  1269. used_scopes=SCOPES,
  1270. scopes=SCOPES,
  1271. default_scopes=None,
  1272. credential_data=TEXT_FILE_SUBJECT_TOKEN,
  1273. )
  1274. def test_refresh_json_file_success_without_impersonation_url(self):
  1275. credentials = self.make_credentials(
  1276. client_id=CLIENT_ID,
  1277. client_secret=CLIENT_SECRET,
  1278. # Test with JSON format type.
  1279. credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
  1280. scopes=SCOPES,
  1281. )
  1282. self.assert_underlying_credentials_refresh(
  1283. credentials=credentials,
  1284. audience=AUDIENCE,
  1285. subject_token=JSON_FILE_SUBJECT_TOKEN,
  1286. subject_token_type=SUBJECT_TOKEN_TYPE,
  1287. token_url=TOKEN_URL,
  1288. service_account_impersonation_url=None,
  1289. basic_auth_encoding=BASIC_AUTH_ENCODING,
  1290. quota_project_id=None,
  1291. used_scopes=SCOPES,
  1292. scopes=SCOPES,
  1293. default_scopes=None,
  1294. credential_data=JSON_FILE_CONTENT,
  1295. )
  1296. def test_refresh_json_file_success_with_impersonation_url(self):
  1297. # Initialize credentials with service account impersonation and basic auth.
  1298. credentials = self.make_credentials(
  1299. # Test with JSON format type.
  1300. credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
  1301. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1302. scopes=SCOPES,
  1303. )
  1304. self.assert_underlying_credentials_refresh(
  1305. credentials=credentials,
  1306. audience=AUDIENCE,
  1307. subject_token=JSON_FILE_SUBJECT_TOKEN,
  1308. subject_token_type=SUBJECT_TOKEN_TYPE,
  1309. token_url=TOKEN_URL,
  1310. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1311. basic_auth_encoding=None,
  1312. quota_project_id=None,
  1313. used_scopes=SCOPES,
  1314. scopes=SCOPES,
  1315. default_scopes=None,
  1316. credential_data=JSON_FILE_CONTENT,
  1317. )
  1318. def test_refresh_with_retrieve_subject_token_error_url(self):
  1319. credential_source = {
  1320. "url": self.CREDENTIAL_URL,
  1321. "format": {"type": "json", "subject_token_field_name": "not_found"},
  1322. }
  1323. credentials = self.make_credentials(credential_source=credential_source)
  1324. with pytest.raises(exceptions.RefreshError) as excinfo:
  1325. credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
  1326. assert excinfo.match(
  1327. "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
  1328. self.CREDENTIAL_URL, "not_found"
  1329. )
  1330. )
  1331. def test_retrieve_subject_token_supplier(self):
  1332. supplier = TestSubjectTokenSupplier(subject_token=JSON_FILE_SUBJECT_TOKEN)
  1333. credentials = self.make_credentials(subject_token_supplier=supplier)
  1334. subject_token = credentials.retrieve_subject_token(None)
  1335. assert subject_token == JSON_FILE_SUBJECT_TOKEN
  1336. def test_retrieve_subject_token_supplier_correct_context(self):
  1337. supplier = TestSubjectTokenSupplier(
  1338. subject_token=JSON_FILE_SUBJECT_TOKEN,
  1339. expected_context=external_account.SupplierContext(
  1340. SUBJECT_TOKEN_TYPE, AUDIENCE
  1341. ),
  1342. )
  1343. credentials = self.make_credentials(subject_token_supplier=supplier)
  1344. credentials.retrieve_subject_token(None)
  1345. def test_retrieve_subject_token_supplier_error(self):
  1346. expected_exception = exceptions.RefreshError("test error")
  1347. supplier = TestSubjectTokenSupplier(subject_token_exception=expected_exception)
  1348. credentials = self.make_credentials(subject_token_supplier=supplier)
  1349. with pytest.raises(exceptions.RefreshError) as excinfo:
  1350. credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
  1351. assert excinfo.match("test error")
  1352. def test_refresh_success_supplier_with_impersonation_url(self):
  1353. # Initialize credentials with service account impersonation and a supplier.
  1354. supplier = TestSubjectTokenSupplier(subject_token=JSON_FILE_SUBJECT_TOKEN)
  1355. credentials = self.make_credentials(
  1356. subject_token_supplier=supplier,
  1357. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1358. scopes=SCOPES,
  1359. )
  1360. self.assert_underlying_credentials_refresh(
  1361. credentials=credentials,
  1362. audience=AUDIENCE,
  1363. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1364. subject_token_type=SUBJECT_TOKEN_TYPE,
  1365. token_url=TOKEN_URL,
  1366. service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
  1367. basic_auth_encoding=None,
  1368. quota_project_id=None,
  1369. used_scopes=SCOPES,
  1370. scopes=SCOPES,
  1371. default_scopes=None,
  1372. )
  1373. def test_refresh_success_supplier_without_impersonation_url(self):
  1374. # Initialize supplier credentials without service account impersonation.
  1375. supplier = TestSubjectTokenSupplier(subject_token=JSON_FILE_SUBJECT_TOKEN)
  1376. credentials = self.make_credentials(
  1377. subject_token_supplier=supplier, scopes=SCOPES
  1378. )
  1379. self.assert_underlying_credentials_refresh(
  1380. credentials=credentials,
  1381. audience=AUDIENCE,
  1382. subject_token=TEXT_FILE_SUBJECT_TOKEN,
  1383. subject_token_type=SUBJECT_TOKEN_TYPE,
  1384. token_url=TOKEN_URL,
  1385. basic_auth_encoding=None,
  1386. quota_project_id=None,
  1387. used_scopes=SCOPES,
  1388. scopes=SCOPES,
  1389. default_scopes=None,
  1390. )
  1391. @mock.patch(
  1392. "google.auth.transport._mtls_helper._get_workload_cert_and_key_paths",
  1393. return_value=("cert", "key"),
  1394. )
  1395. def test_get_mtls_certs(self, mock_get_workload_cert_and_key_paths):
  1396. credentials = self.make_credentials(
  1397. credential_source=self.CREDENTIAL_SOURCE_CERTIFICATE.copy()
  1398. )
  1399. cert, key = credentials._get_mtls_cert_and_key_paths()
  1400. assert cert == "cert"
  1401. assert key == "key"
  1402. def test_get_mtls_certs_invalid(self):
  1403. credentials = self.make_credentials(
  1404. credential_source=self.CREDENTIAL_SOURCE_TEXT.copy()
  1405. )
  1406. with pytest.raises(exceptions.RefreshError) as excinfo:
  1407. credentials._get_mtls_cert_and_key_paths()
  1408. assert excinfo.match(
  1409. 'The credential is not configured to use mtls requests. The credential should include a "certificate" section in the credential source.'
  1410. )