test_downscoped.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. # Copyright 2021 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import json
  17. import urllib
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import credentials
  22. from google.auth import downscoped
  23. from google.auth import exceptions
  24. from google.auth import transport
  25. from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
  26. from google.auth.credentials import TokenState
  27. EXPRESSION = (
  28. "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
  29. )
  30. TITLE = "customer-a-objects"
  31. DESCRIPTION = (
  32. "Condition to make permissions available for objects starting with customer-a"
  33. )
  34. AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket"
  35. AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"]
  36. OTHER_EXPRESSION = (
  37. "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')"
  38. )
  39. OTHER_TITLE = "customer-b-objects"
  40. OTHER_DESCRIPTION = (
  41. "Condition to make permissions available for objects starting with customer-b"
  42. )
  43. OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
  44. OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
  45. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  46. GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
  47. REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
  48. TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
  49. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
  50. SUCCESS_RESPONSE = {
  51. "access_token": "ACCESS_TOKEN",
  52. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  53. "token_type": "Bearer",
  54. "expires_in": 3600,
  55. }
  56. ERROR_RESPONSE = {
  57. "error": "invalid_grant",
  58. "error_description": "Subject token is invalid.",
  59. "error_uri": "https://tools.ietf.org/html/rfc6749",
  60. }
  61. CREDENTIAL_ACCESS_BOUNDARY_JSON = {
  62. "accessBoundary": {
  63. "accessBoundaryRules": [
  64. {
  65. "availablePermissions": AVAILABLE_PERMISSIONS,
  66. "availableResource": AVAILABLE_RESOURCE,
  67. "availabilityCondition": {
  68. "expression": EXPRESSION,
  69. "title": TITLE,
  70. "description": DESCRIPTION,
  71. },
  72. }
  73. ]
  74. }
  75. }
  76. class SourceCredentials(credentials.Credentials):
  77. def __init__(self, raise_error=False, expires_in=3600):
  78. super(SourceCredentials, self).__init__()
  79. self._counter = 0
  80. self._raise_error = raise_error
  81. self._expires_in = expires_in
  82. def refresh(self, request):
  83. if self._raise_error:
  84. raise exceptions.RefreshError(
  85. "Failed to refresh access token in source credentials."
  86. )
  87. now = _helpers.utcnow()
  88. self._counter += 1
  89. self.token = "ACCESS_TOKEN_{}".format(self._counter)
  90. self.expiry = now + datetime.timedelta(seconds=self._expires_in)
  91. def make_availability_condition(expression, title=None, description=None):
  92. return downscoped.AvailabilityCondition(expression, title, description)
  93. def make_access_boundary_rule(
  94. available_resource, available_permissions, availability_condition=None
  95. ):
  96. return downscoped.AccessBoundaryRule(
  97. available_resource, available_permissions, availability_condition
  98. )
  99. def make_credential_access_boundary(rules):
  100. return downscoped.CredentialAccessBoundary(rules)
  101. class TestAvailabilityCondition(object):
  102. def test_constructor(self):
  103. availability_condition = make_availability_condition(
  104. EXPRESSION, TITLE, DESCRIPTION
  105. )
  106. assert availability_condition.expression == EXPRESSION
  107. assert availability_condition.title == TITLE
  108. assert availability_condition.description == DESCRIPTION
  109. def test_constructor_required_params_only(self):
  110. availability_condition = make_availability_condition(EXPRESSION)
  111. assert availability_condition.expression == EXPRESSION
  112. assert availability_condition.title is None
  113. assert availability_condition.description is None
  114. def test_setters(self):
  115. availability_condition = make_availability_condition(
  116. EXPRESSION, TITLE, DESCRIPTION
  117. )
  118. availability_condition.expression = OTHER_EXPRESSION
  119. availability_condition.title = OTHER_TITLE
  120. availability_condition.description = OTHER_DESCRIPTION
  121. assert availability_condition.expression == OTHER_EXPRESSION
  122. assert availability_condition.title == OTHER_TITLE
  123. assert availability_condition.description == OTHER_DESCRIPTION
  124. def test_invalid_expression_type(self):
  125. with pytest.raises(TypeError) as excinfo:
  126. make_availability_condition([EXPRESSION], TITLE, DESCRIPTION)
  127. assert excinfo.match("The provided expression is not a string.")
  128. def test_invalid_title_type(self):
  129. with pytest.raises(TypeError) as excinfo:
  130. make_availability_condition(EXPRESSION, False, DESCRIPTION)
  131. assert excinfo.match("The provided title is not a string or None.")
  132. def test_invalid_description_type(self):
  133. with pytest.raises(TypeError) as excinfo:
  134. make_availability_condition(EXPRESSION, TITLE, False)
  135. assert excinfo.match("The provided description is not a string or None.")
  136. def test_to_json_required_params_only(self):
  137. availability_condition = make_availability_condition(EXPRESSION)
  138. assert availability_condition.to_json() == {"expression": EXPRESSION}
  139. def test_to_json_(self):
  140. availability_condition = make_availability_condition(
  141. EXPRESSION, TITLE, DESCRIPTION
  142. )
  143. assert availability_condition.to_json() == {
  144. "expression": EXPRESSION,
  145. "title": TITLE,
  146. "description": DESCRIPTION,
  147. }
  148. class TestAccessBoundaryRule(object):
  149. def test_constructor(self):
  150. availability_condition = make_availability_condition(
  151. EXPRESSION, TITLE, DESCRIPTION
  152. )
  153. access_boundary_rule = make_access_boundary_rule(
  154. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  155. )
  156. assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
  157. assert access_boundary_rule.available_permissions == tuple(
  158. AVAILABLE_PERMISSIONS
  159. )
  160. assert access_boundary_rule.availability_condition == availability_condition
  161. def test_constructor_required_params_only(self):
  162. access_boundary_rule = make_access_boundary_rule(
  163. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
  164. )
  165. assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
  166. assert access_boundary_rule.available_permissions == tuple(
  167. AVAILABLE_PERMISSIONS
  168. )
  169. assert access_boundary_rule.availability_condition is None
  170. def test_setters(self):
  171. availability_condition = make_availability_condition(
  172. EXPRESSION, TITLE, DESCRIPTION
  173. )
  174. other_availability_condition = make_availability_condition(
  175. OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
  176. )
  177. access_boundary_rule = make_access_boundary_rule(
  178. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  179. )
  180. access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE
  181. access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS
  182. access_boundary_rule.availability_condition = other_availability_condition
  183. assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE
  184. assert access_boundary_rule.available_permissions == tuple(
  185. OTHER_AVAILABLE_PERMISSIONS
  186. )
  187. assert (
  188. access_boundary_rule.availability_condition == other_availability_condition
  189. )
  190. def test_invalid_available_resource_type(self):
  191. availability_condition = make_availability_condition(
  192. EXPRESSION, TITLE, DESCRIPTION
  193. )
  194. with pytest.raises(TypeError) as excinfo:
  195. make_access_boundary_rule(
  196. None, AVAILABLE_PERMISSIONS, availability_condition
  197. )
  198. assert excinfo.match("The provided available_resource is not a string.")
  199. def test_invalid_available_permissions_type(self):
  200. availability_condition = make_availability_condition(
  201. EXPRESSION, TITLE, DESCRIPTION
  202. )
  203. with pytest.raises(TypeError) as excinfo:
  204. make_access_boundary_rule(
  205. AVAILABLE_RESOURCE, [0, 1, 2], availability_condition
  206. )
  207. assert excinfo.match(
  208. "Provided available_permissions are not a list of strings."
  209. )
  210. def test_invalid_available_permissions_value(self):
  211. availability_condition = make_availability_condition(
  212. EXPRESSION, TITLE, DESCRIPTION
  213. )
  214. with pytest.raises(ValueError) as excinfo:
  215. make_access_boundary_rule(
  216. AVAILABLE_RESOURCE,
  217. ["roles/storage.objectViewer"],
  218. availability_condition,
  219. )
  220. assert excinfo.match("available_permissions must be prefixed with 'inRole:'.")
  221. def test_invalid_availability_condition_type(self):
  222. with pytest.raises(TypeError) as excinfo:
  223. make_access_boundary_rule(
  224. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"}
  225. )
  226. assert excinfo.match(
  227. "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
  228. )
  229. def test_to_json(self):
  230. availability_condition = make_availability_condition(
  231. EXPRESSION, TITLE, DESCRIPTION
  232. )
  233. access_boundary_rule = make_access_boundary_rule(
  234. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  235. )
  236. assert access_boundary_rule.to_json() == {
  237. "availablePermissions": AVAILABLE_PERMISSIONS,
  238. "availableResource": AVAILABLE_RESOURCE,
  239. "availabilityCondition": {
  240. "expression": EXPRESSION,
  241. "title": TITLE,
  242. "description": DESCRIPTION,
  243. },
  244. }
  245. def test_to_json_required_params_only(self):
  246. access_boundary_rule = make_access_boundary_rule(
  247. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
  248. )
  249. assert access_boundary_rule.to_json() == {
  250. "availablePermissions": AVAILABLE_PERMISSIONS,
  251. "availableResource": AVAILABLE_RESOURCE,
  252. }
  253. class TestCredentialAccessBoundary(object):
  254. def test_constructor(self):
  255. availability_condition = make_availability_condition(
  256. EXPRESSION, TITLE, DESCRIPTION
  257. )
  258. access_boundary_rule = make_access_boundary_rule(
  259. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  260. )
  261. rules = [access_boundary_rule]
  262. credential_access_boundary = make_credential_access_boundary(rules)
  263. assert credential_access_boundary.rules == tuple(rules)
  264. def test_setters(self):
  265. availability_condition = make_availability_condition(
  266. EXPRESSION, TITLE, DESCRIPTION
  267. )
  268. access_boundary_rule = make_access_boundary_rule(
  269. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  270. )
  271. rules = [access_boundary_rule]
  272. other_availability_condition = make_availability_condition(
  273. OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
  274. )
  275. other_access_boundary_rule = make_access_boundary_rule(
  276. OTHER_AVAILABLE_RESOURCE,
  277. OTHER_AVAILABLE_PERMISSIONS,
  278. other_availability_condition,
  279. )
  280. other_rules = [other_access_boundary_rule]
  281. credential_access_boundary = make_credential_access_boundary(rules)
  282. credential_access_boundary.rules = other_rules
  283. assert credential_access_boundary.rules == tuple(other_rules)
  284. def test_add_rule(self):
  285. availability_condition = make_availability_condition(
  286. EXPRESSION, TITLE, DESCRIPTION
  287. )
  288. access_boundary_rule = make_access_boundary_rule(
  289. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  290. )
  291. rules = [access_boundary_rule] * 9
  292. credential_access_boundary = make_credential_access_boundary(rules)
  293. # Add one more rule. This should not raise an error.
  294. additional_access_boundary_rule = make_access_boundary_rule(
  295. OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS
  296. )
  297. credential_access_boundary.add_rule(additional_access_boundary_rule)
  298. assert len(credential_access_boundary.rules) == 10
  299. assert credential_access_boundary.rules[9] == additional_access_boundary_rule
  300. def test_add_rule_invalid_value(self):
  301. availability_condition = make_availability_condition(
  302. EXPRESSION, TITLE, DESCRIPTION
  303. )
  304. access_boundary_rule = make_access_boundary_rule(
  305. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  306. )
  307. rules = [access_boundary_rule] * 10
  308. credential_access_boundary = make_credential_access_boundary(rules)
  309. # Add one more rule to exceed maximum allowed rules.
  310. with pytest.raises(ValueError) as excinfo:
  311. credential_access_boundary.add_rule(access_boundary_rule)
  312. assert excinfo.match(
  313. "Credential access boundary rules can have a maximum of 10 rules."
  314. )
  315. assert len(credential_access_boundary.rules) == 10
  316. def test_add_rule_invalid_type(self):
  317. availability_condition = make_availability_condition(
  318. EXPRESSION, TITLE, DESCRIPTION
  319. )
  320. access_boundary_rule = make_access_boundary_rule(
  321. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  322. )
  323. rules = [access_boundary_rule]
  324. credential_access_boundary = make_credential_access_boundary(rules)
  325. # Add an invalid rule to exceed maximum allowed rules.
  326. with pytest.raises(TypeError) as excinfo:
  327. credential_access_boundary.add_rule("invalid")
  328. assert excinfo.match(
  329. "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
  330. )
  331. assert len(credential_access_boundary.rules) == 1
  332. assert credential_access_boundary.rules[0] == access_boundary_rule
  333. def test_invalid_rules_type(self):
  334. with pytest.raises(TypeError) as excinfo:
  335. make_credential_access_boundary(["invalid"])
  336. assert excinfo.match(
  337. "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
  338. )
  339. def test_invalid_rules_value(self):
  340. availability_condition = make_availability_condition(
  341. EXPRESSION, TITLE, DESCRIPTION
  342. )
  343. access_boundary_rule = make_access_boundary_rule(
  344. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  345. )
  346. too_many_rules = [access_boundary_rule] * 11
  347. with pytest.raises(ValueError) as excinfo:
  348. make_credential_access_boundary(too_many_rules)
  349. assert excinfo.match(
  350. "Credential access boundary rules can have a maximum of 10 rules."
  351. )
  352. def test_to_json(self):
  353. availability_condition = make_availability_condition(
  354. EXPRESSION, TITLE, DESCRIPTION
  355. )
  356. access_boundary_rule = make_access_boundary_rule(
  357. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  358. )
  359. rules = [access_boundary_rule]
  360. credential_access_boundary = make_credential_access_boundary(rules)
  361. assert credential_access_boundary.to_json() == {
  362. "accessBoundary": {
  363. "accessBoundaryRules": [
  364. {
  365. "availablePermissions": AVAILABLE_PERMISSIONS,
  366. "availableResource": AVAILABLE_RESOURCE,
  367. "availabilityCondition": {
  368. "expression": EXPRESSION,
  369. "title": TITLE,
  370. "description": DESCRIPTION,
  371. },
  372. }
  373. ]
  374. }
  375. }
  376. class TestCredentials(object):
  377. @staticmethod
  378. def make_credentials(
  379. source_credentials=SourceCredentials(),
  380. quota_project_id=None,
  381. universe_domain=None,
  382. ):
  383. availability_condition = make_availability_condition(
  384. EXPRESSION, TITLE, DESCRIPTION
  385. )
  386. access_boundary_rule = make_access_boundary_rule(
  387. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  388. )
  389. rules = [access_boundary_rule]
  390. credential_access_boundary = make_credential_access_boundary(rules)
  391. return downscoped.Credentials(
  392. source_credentials,
  393. credential_access_boundary,
  394. quota_project_id,
  395. universe_domain,
  396. )
  397. @staticmethod
  398. def make_mock_request(data, status=http_client.OK):
  399. response = mock.create_autospec(transport.Response, instance=True)
  400. response.status = status
  401. response.data = json.dumps(data).encode("utf-8")
  402. request = mock.create_autospec(transport.Request)
  403. request.return_value = response
  404. return request
  405. @staticmethod
  406. def assert_request_kwargs(
  407. request_kwargs, headers, request_data, token_endpoint=TOKEN_EXCHANGE_ENDPOINT
  408. ):
  409. """Asserts the request was called with the expected parameters.
  410. """
  411. assert request_kwargs["url"] == token_endpoint
  412. assert request_kwargs["method"] == "POST"
  413. assert request_kwargs["headers"] == headers
  414. assert request_kwargs["body"] is not None
  415. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  416. for (k, v) in body_tuples:
  417. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  418. assert len(body_tuples) == len(request_data.keys())
  419. def test_default_state(self):
  420. credentials = self.make_credentials()
  421. # No token acquired yet.
  422. assert not credentials.token
  423. assert not credentials.valid
  424. # Expiration hasn't been set yet.
  425. assert not credentials.expiry
  426. assert not credentials.expired
  427. # No quota project ID set.
  428. assert not credentials.quota_project_id
  429. assert credentials.universe_domain == DEFAULT_UNIVERSE_DOMAIN
  430. def test_default_state_with_explicit_none_value(self):
  431. credentials = self.make_credentials(universe_domain=None)
  432. # No token acquired yet.
  433. assert not credentials.token
  434. assert not credentials.valid
  435. # Expiration hasn't been set yet.
  436. assert not credentials.expiry
  437. assert not credentials.expired
  438. # No quota project ID set.
  439. assert not credentials.quota_project_id
  440. assert credentials.universe_domain == DEFAULT_UNIVERSE_DOMAIN
  441. def test_create_with_customized_universe_domain(self):
  442. test_universe_domain = "foo.com"
  443. credentials = self.make_credentials(universe_domain=test_universe_domain)
  444. # No token acquired yet.
  445. assert not credentials.token
  446. assert not credentials.valid
  447. # Expiration hasn't been set yet.
  448. assert not credentials.expiry
  449. assert not credentials.expired
  450. # No quota project ID set.
  451. assert not credentials.quota_project_id
  452. assert credentials.universe_domain == test_universe_domain
  453. def test_with_quota_project(self):
  454. credentials = self.make_credentials()
  455. assert not credentials.quota_project_id
  456. quota_project_creds = credentials.with_quota_project("project-foo")
  457. assert quota_project_creds.quota_project_id == "project-foo"
  458. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  459. def test_refresh_on_custom_universe(self, unused_utcnow):
  460. test_universe_domain = "foo.com"
  461. response = SUCCESS_RESPONSE.copy()
  462. # Test custom expiration to confirm expiry is set correctly.
  463. response["expires_in"] = 2800
  464. expected_expiry = datetime.datetime.min + datetime.timedelta(
  465. seconds=response["expires_in"]
  466. )
  467. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  468. request_data = {
  469. "grant_type": GRANT_TYPE,
  470. "subject_token": "ACCESS_TOKEN_1",
  471. "subject_token_type": SUBJECT_TOKEN_TYPE,
  472. "requested_token_type": REQUESTED_TOKEN_TYPE,
  473. "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
  474. }
  475. request = self.make_mock_request(status=http_client.OK, data=response)
  476. source_credentials = SourceCredentials()
  477. credentials = self.make_credentials(
  478. source_credentials=source_credentials, universe_domain=test_universe_domain
  479. )
  480. token_exchange_endpoint = downscoped._STS_TOKEN_URL_PATTERN.format(
  481. test_universe_domain
  482. )
  483. # Spy on calls to source credentials refresh to confirm the expected request
  484. # instance is used.
  485. with mock.patch.object(
  486. source_credentials, "refresh", wraps=source_credentials.refresh
  487. ) as wrapped_souce_cred_refresh:
  488. credentials.refresh(request)
  489. self.assert_request_kwargs(
  490. request.call_args[1], headers, request_data, token_exchange_endpoint
  491. )
  492. assert credentials.valid
  493. assert credentials.expiry == expected_expiry
  494. assert not credentials.expired
  495. assert credentials.token == response["access_token"]
  496. # Confirm source credentials called with the same request instance.
  497. wrapped_souce_cred_refresh.assert_called_with(request)
  498. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  499. def test_refresh(self, unused_utcnow):
  500. response = SUCCESS_RESPONSE.copy()
  501. # Test custom expiration to confirm expiry is set correctly.
  502. response["expires_in"] = 2800
  503. expected_expiry = datetime.datetime.min + datetime.timedelta(
  504. seconds=response["expires_in"]
  505. )
  506. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  507. request_data = {
  508. "grant_type": GRANT_TYPE,
  509. "subject_token": "ACCESS_TOKEN_1",
  510. "subject_token_type": SUBJECT_TOKEN_TYPE,
  511. "requested_token_type": REQUESTED_TOKEN_TYPE,
  512. "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
  513. }
  514. request = self.make_mock_request(status=http_client.OK, data=response)
  515. source_credentials = SourceCredentials()
  516. credentials = self.make_credentials(source_credentials=source_credentials)
  517. # Spy on calls to source credentials refresh to confirm the expected request
  518. # instance is used.
  519. with mock.patch.object(
  520. source_credentials, "refresh", wraps=source_credentials.refresh
  521. ) as wrapped_souce_cred_refresh:
  522. credentials.refresh(request)
  523. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  524. assert credentials.valid
  525. assert credentials.expiry == expected_expiry
  526. assert not credentials.expired
  527. assert credentials.token == response["access_token"]
  528. # Confirm source credentials called with the same request instance.
  529. wrapped_souce_cred_refresh.assert_called_with(request)
  530. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  531. def test_refresh_without_response_expires_in(self, unused_utcnow):
  532. response = SUCCESS_RESPONSE.copy()
  533. # Simulate the response is missing the expires_in field.
  534. # The downscoped token expiration should match the source credentials
  535. # expiration.
  536. del response["expires_in"]
  537. expected_expires_in = 1800
  538. # Simulate the source credentials generates a token with 1800 second
  539. # expiration time. The generated downscoped token should have the same
  540. # expiration time.
  541. source_credentials = SourceCredentials(expires_in=expected_expires_in)
  542. expected_expiry = datetime.datetime.min + datetime.timedelta(
  543. seconds=expected_expires_in
  544. )
  545. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  546. request_data = {
  547. "grant_type": GRANT_TYPE,
  548. "subject_token": "ACCESS_TOKEN_1",
  549. "subject_token_type": SUBJECT_TOKEN_TYPE,
  550. "requested_token_type": REQUESTED_TOKEN_TYPE,
  551. "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
  552. }
  553. request = self.make_mock_request(status=http_client.OK, data=response)
  554. credentials = self.make_credentials(source_credentials=source_credentials)
  555. # Spy on calls to source credentials refresh to confirm the expected request
  556. # instance is used.
  557. with mock.patch.object(
  558. source_credentials, "refresh", wraps=source_credentials.refresh
  559. ) as wrapped_souce_cred_refresh:
  560. credentials.refresh(request)
  561. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  562. assert credentials.valid
  563. assert credentials.expiry == expected_expiry
  564. assert not credentials.expired
  565. assert credentials.token == response["access_token"]
  566. # Confirm source credentials called with the same request instance.
  567. wrapped_souce_cred_refresh.assert_called_with(request)
  568. def test_refresh_token_exchange_error(self):
  569. request = self.make_mock_request(
  570. status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
  571. )
  572. credentials = self.make_credentials()
  573. with pytest.raises(exceptions.OAuthError) as excinfo:
  574. credentials.refresh(request)
  575. assert excinfo.match(
  576. r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
  577. )
  578. assert not credentials.expired
  579. assert credentials.token is None
  580. def test_refresh_source_credentials_refresh_error(self):
  581. # Initialize downscoped credentials with source credentials that raise
  582. # an error on refresh.
  583. credentials = self.make_credentials(
  584. source_credentials=SourceCredentials(raise_error=True)
  585. )
  586. with pytest.raises(exceptions.RefreshError) as excinfo:
  587. credentials.refresh(mock.sentinel.request)
  588. assert excinfo.match(r"Failed to refresh access token in source credentials.")
  589. assert not credentials.expired
  590. assert credentials.token is None
  591. def test_apply_without_quota_project_id(self):
  592. headers = {}
  593. request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
  594. credentials = self.make_credentials()
  595. credentials.refresh(request)
  596. credentials.apply(headers)
  597. assert headers == {
  598. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
  599. }
  600. def test_apply_with_quota_project_id(self):
  601. headers = {"other": "header-value"}
  602. request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
  603. credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)
  604. credentials.refresh(request)
  605. credentials.apply(headers)
  606. assert headers == {
  607. "other": "header-value",
  608. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
  609. "x-goog-user-project": QUOTA_PROJECT_ID,
  610. }
  611. def test_before_request(self):
  612. headers = {"other": "header-value"}
  613. request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
  614. credentials = self.make_credentials()
  615. # First call should call refresh, setting the token.
  616. credentials.before_request(request, "POST", "https://example.com/api", headers)
  617. assert headers == {
  618. "other": "header-value",
  619. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
  620. }
  621. # Second call shouldn't call refresh (request should be untouched).
  622. credentials.before_request(
  623. mock.sentinel.request, "POST", "https://example.com/api", headers
  624. )
  625. assert headers == {
  626. "other": "header-value",
  627. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
  628. }
  629. @mock.patch("google.auth._helpers.utcnow")
  630. def test_before_request_expired(self, utcnow):
  631. headers = {}
  632. request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
  633. credentials = self.make_credentials()
  634. credentials.token = "token"
  635. utcnow.return_value = datetime.datetime.min
  636. # Set the expiration to one second more than now plus the clock skew
  637. # accommodation. These credentials should be valid.
  638. credentials.expiry = (
  639. datetime.datetime.min
  640. + _helpers.REFRESH_THRESHOLD
  641. + datetime.timedelta(seconds=1)
  642. )
  643. assert credentials.valid
  644. assert not credentials.expired
  645. assert credentials.token_state == TokenState.FRESH
  646. credentials.before_request(request, "POST", "https://example.com/api", headers)
  647. # Cached token should be used.
  648. assert headers == {"authorization": "Bearer token"}
  649. # Next call should simulate 1 second passed.
  650. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  651. assert not credentials.valid
  652. assert credentials.expired
  653. assert credentials.token_state == TokenState.STALE
  654. credentials.before_request(request, "POST", "https://example.com/api", headers)
  655. assert credentials.token_state == TokenState.FRESH
  656. # New token should be retrieved.
  657. assert headers == {
  658. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
  659. }
  660. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=6000)
  661. assert not credentials.valid
  662. assert credentials.expired
  663. assert credentials.token_state == TokenState.INVALID
  664. credentials.before_request(request, "POST", "https://example.com/api", headers)
  665. assert credentials.token_state == TokenState.FRESH
  666. # New token should be retrieved.
  667. assert headers == {
  668. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
  669. }