test_downscoped.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694
  1. # Copyright 2021 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import json
  16. import mock
  17. import pytest
  18. from six.moves import http_client
  19. from six.moves import urllib
  20. from google.auth import _helpers
  21. from google.auth import credentials
  22. from google.auth import downscoped
  23. from google.auth import exceptions
  24. from google.auth import transport
  25. EXPRESSION = (
  26. "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
  27. )
  28. TITLE = "customer-a-objects"
  29. DESCRIPTION = (
  30. "Condition to make permissions available for objects starting with customer-a"
  31. )
  32. AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket"
  33. AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"]
  34. OTHER_EXPRESSION = (
  35. "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')"
  36. )
  37. OTHER_TITLE = "customer-b-objects"
  38. OTHER_DESCRIPTION = (
  39. "Condition to make permissions available for objects starting with customer-b"
  40. )
  41. OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
  42. OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
  43. QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
  44. GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
  45. REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
  46. TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
  47. SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
  48. SUCCESS_RESPONSE = {
  49. "access_token": "ACCESS_TOKEN",
  50. "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
  51. "token_type": "Bearer",
  52. "expires_in": 3600,
  53. }
  54. ERROR_RESPONSE = {
  55. "error": "invalid_grant",
  56. "error_description": "Subject token is invalid.",
  57. "error_uri": "https://tools.ietf.org/html/rfc6749",
  58. }
  59. CREDENTIAL_ACCESS_BOUNDARY_JSON = {
  60. "accessBoundary": {
  61. "accessBoundaryRules": [
  62. {
  63. "availablePermissions": AVAILABLE_PERMISSIONS,
  64. "availableResource": AVAILABLE_RESOURCE,
  65. "availabilityCondition": {
  66. "expression": EXPRESSION,
  67. "title": TITLE,
  68. "description": DESCRIPTION,
  69. },
  70. }
  71. ]
  72. }
  73. }
  74. class SourceCredentials(credentials.Credentials):
  75. def __init__(self, raise_error=False, expires_in=3600):
  76. super(SourceCredentials, self).__init__()
  77. self._counter = 0
  78. self._raise_error = raise_error
  79. self._expires_in = expires_in
  80. def refresh(self, request):
  81. if self._raise_error:
  82. raise exceptions.RefreshError(
  83. "Failed to refresh access token in source credentials."
  84. )
  85. now = _helpers.utcnow()
  86. self._counter += 1
  87. self.token = "ACCESS_TOKEN_{}".format(self._counter)
  88. self.expiry = now + datetime.timedelta(seconds=self._expires_in)
  89. def make_availability_condition(expression, title=None, description=None):
  90. return downscoped.AvailabilityCondition(expression, title, description)
  91. def make_access_boundary_rule(
  92. available_resource, available_permissions, availability_condition=None
  93. ):
  94. return downscoped.AccessBoundaryRule(
  95. available_resource, available_permissions, availability_condition
  96. )
  97. def make_credential_access_boundary(rules):
  98. return downscoped.CredentialAccessBoundary(rules)
  99. class TestAvailabilityCondition(object):
  100. def test_constructor(self):
  101. availability_condition = make_availability_condition(
  102. EXPRESSION, TITLE, DESCRIPTION
  103. )
  104. assert availability_condition.expression == EXPRESSION
  105. assert availability_condition.title == TITLE
  106. assert availability_condition.description == DESCRIPTION
  107. def test_constructor_required_params_only(self):
  108. availability_condition = make_availability_condition(EXPRESSION)
  109. assert availability_condition.expression == EXPRESSION
  110. assert availability_condition.title is None
  111. assert availability_condition.description is None
  112. def test_setters(self):
  113. availability_condition = make_availability_condition(
  114. EXPRESSION, TITLE, DESCRIPTION
  115. )
  116. availability_condition.expression = OTHER_EXPRESSION
  117. availability_condition.title = OTHER_TITLE
  118. availability_condition.description = OTHER_DESCRIPTION
  119. assert availability_condition.expression == OTHER_EXPRESSION
  120. assert availability_condition.title == OTHER_TITLE
  121. assert availability_condition.description == OTHER_DESCRIPTION
  122. def test_invalid_expression_type(self):
  123. with pytest.raises(TypeError) as excinfo:
  124. make_availability_condition([EXPRESSION], TITLE, DESCRIPTION)
  125. assert excinfo.match("The provided expression is not a string.")
  126. def test_invalid_title_type(self):
  127. with pytest.raises(TypeError) as excinfo:
  128. make_availability_condition(EXPRESSION, False, DESCRIPTION)
  129. assert excinfo.match("The provided title is not a string or None.")
  130. def test_invalid_description_type(self):
  131. with pytest.raises(TypeError) as excinfo:
  132. make_availability_condition(EXPRESSION, TITLE, False)
  133. assert excinfo.match("The provided description is not a string or None.")
  134. def test_to_json_required_params_only(self):
  135. availability_condition = make_availability_condition(EXPRESSION)
  136. assert availability_condition.to_json() == {"expression": EXPRESSION}
  137. def test_to_json_(self):
  138. availability_condition = make_availability_condition(
  139. EXPRESSION, TITLE, DESCRIPTION
  140. )
  141. assert availability_condition.to_json() == {
  142. "expression": EXPRESSION,
  143. "title": TITLE,
  144. "description": DESCRIPTION,
  145. }
  146. class TestAccessBoundaryRule(object):
  147. def test_constructor(self):
  148. availability_condition = make_availability_condition(
  149. EXPRESSION, TITLE, DESCRIPTION
  150. )
  151. access_boundary_rule = make_access_boundary_rule(
  152. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  153. )
  154. assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
  155. assert access_boundary_rule.available_permissions == tuple(
  156. AVAILABLE_PERMISSIONS
  157. )
  158. assert access_boundary_rule.availability_condition == availability_condition
  159. def test_constructor_required_params_only(self):
  160. access_boundary_rule = make_access_boundary_rule(
  161. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
  162. )
  163. assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
  164. assert access_boundary_rule.available_permissions == tuple(
  165. AVAILABLE_PERMISSIONS
  166. )
  167. assert access_boundary_rule.availability_condition is None
  168. def test_setters(self):
  169. availability_condition = make_availability_condition(
  170. EXPRESSION, TITLE, DESCRIPTION
  171. )
  172. other_availability_condition = make_availability_condition(
  173. OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
  174. )
  175. access_boundary_rule = make_access_boundary_rule(
  176. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  177. )
  178. access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE
  179. access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS
  180. access_boundary_rule.availability_condition = other_availability_condition
  181. assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE
  182. assert access_boundary_rule.available_permissions == tuple(
  183. OTHER_AVAILABLE_PERMISSIONS
  184. )
  185. assert (
  186. access_boundary_rule.availability_condition == other_availability_condition
  187. )
  188. def test_invalid_available_resource_type(self):
  189. availability_condition = make_availability_condition(
  190. EXPRESSION, TITLE, DESCRIPTION
  191. )
  192. with pytest.raises(TypeError) as excinfo:
  193. make_access_boundary_rule(
  194. None, AVAILABLE_PERMISSIONS, availability_condition
  195. )
  196. assert excinfo.match("The provided available_resource is not a string.")
  197. def test_invalid_available_permissions_type(self):
  198. availability_condition = make_availability_condition(
  199. EXPRESSION, TITLE, DESCRIPTION
  200. )
  201. with pytest.raises(TypeError) as excinfo:
  202. make_access_boundary_rule(
  203. AVAILABLE_RESOURCE, [0, 1, 2], availability_condition
  204. )
  205. assert excinfo.match(
  206. "Provided available_permissions are not a list of strings."
  207. )
  208. def test_invalid_available_permissions_value(self):
  209. availability_condition = make_availability_condition(
  210. EXPRESSION, TITLE, DESCRIPTION
  211. )
  212. with pytest.raises(ValueError) as excinfo:
  213. make_access_boundary_rule(
  214. AVAILABLE_RESOURCE,
  215. ["roles/storage.objectViewer"],
  216. availability_condition,
  217. )
  218. assert excinfo.match("available_permissions must be prefixed with 'inRole:'.")
  219. def test_invalid_availability_condition_type(self):
  220. with pytest.raises(TypeError) as excinfo:
  221. make_access_boundary_rule(
  222. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"}
  223. )
  224. assert excinfo.match(
  225. "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
  226. )
  227. def test_to_json(self):
  228. availability_condition = make_availability_condition(
  229. EXPRESSION, TITLE, DESCRIPTION
  230. )
  231. access_boundary_rule = make_access_boundary_rule(
  232. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  233. )
  234. assert access_boundary_rule.to_json() == {
  235. "availablePermissions": AVAILABLE_PERMISSIONS,
  236. "availableResource": AVAILABLE_RESOURCE,
  237. "availabilityCondition": {
  238. "expression": EXPRESSION,
  239. "title": TITLE,
  240. "description": DESCRIPTION,
  241. },
  242. }
  243. def test_to_json_required_params_only(self):
  244. access_boundary_rule = make_access_boundary_rule(
  245. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
  246. )
  247. assert access_boundary_rule.to_json() == {
  248. "availablePermissions": AVAILABLE_PERMISSIONS,
  249. "availableResource": AVAILABLE_RESOURCE,
  250. }
  251. class TestCredentialAccessBoundary(object):
  252. def test_constructor(self):
  253. availability_condition = make_availability_condition(
  254. EXPRESSION, TITLE, DESCRIPTION
  255. )
  256. access_boundary_rule = make_access_boundary_rule(
  257. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  258. )
  259. rules = [access_boundary_rule]
  260. credential_access_boundary = make_credential_access_boundary(rules)
  261. assert credential_access_boundary.rules == tuple(rules)
  262. def test_setters(self):
  263. availability_condition = make_availability_condition(
  264. EXPRESSION, TITLE, DESCRIPTION
  265. )
  266. access_boundary_rule = make_access_boundary_rule(
  267. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  268. )
  269. rules = [access_boundary_rule]
  270. other_availability_condition = make_availability_condition(
  271. OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
  272. )
  273. other_access_boundary_rule = make_access_boundary_rule(
  274. OTHER_AVAILABLE_RESOURCE,
  275. OTHER_AVAILABLE_PERMISSIONS,
  276. other_availability_condition,
  277. )
  278. other_rules = [other_access_boundary_rule]
  279. credential_access_boundary = make_credential_access_boundary(rules)
  280. credential_access_boundary.rules = other_rules
  281. assert credential_access_boundary.rules == tuple(other_rules)
  282. def test_add_rule(self):
  283. availability_condition = make_availability_condition(
  284. EXPRESSION, TITLE, DESCRIPTION
  285. )
  286. access_boundary_rule = make_access_boundary_rule(
  287. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  288. )
  289. rules = [access_boundary_rule] * 9
  290. credential_access_boundary = make_credential_access_boundary(rules)
  291. # Add one more rule. This should not raise an error.
  292. additional_access_boundary_rule = make_access_boundary_rule(
  293. OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS
  294. )
  295. credential_access_boundary.add_rule(additional_access_boundary_rule)
  296. assert len(credential_access_boundary.rules) == 10
  297. assert credential_access_boundary.rules[9] == additional_access_boundary_rule
  298. def test_add_rule_invalid_value(self):
  299. availability_condition = make_availability_condition(
  300. EXPRESSION, TITLE, DESCRIPTION
  301. )
  302. access_boundary_rule = make_access_boundary_rule(
  303. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  304. )
  305. rules = [access_boundary_rule] * 10
  306. credential_access_boundary = make_credential_access_boundary(rules)
  307. # Add one more rule to exceed maximum allowed rules.
  308. with pytest.raises(ValueError) as excinfo:
  309. credential_access_boundary.add_rule(access_boundary_rule)
  310. assert excinfo.match(
  311. "Credential access boundary rules can have a maximum of 10 rules."
  312. )
  313. assert len(credential_access_boundary.rules) == 10
  314. def test_add_rule_invalid_type(self):
  315. availability_condition = make_availability_condition(
  316. EXPRESSION, TITLE, DESCRIPTION
  317. )
  318. access_boundary_rule = make_access_boundary_rule(
  319. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  320. )
  321. rules = [access_boundary_rule]
  322. credential_access_boundary = make_credential_access_boundary(rules)
  323. # Add an invalid rule to exceed maximum allowed rules.
  324. with pytest.raises(TypeError) as excinfo:
  325. credential_access_boundary.add_rule("invalid")
  326. assert excinfo.match(
  327. "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
  328. )
  329. assert len(credential_access_boundary.rules) == 1
  330. assert credential_access_boundary.rules[0] == access_boundary_rule
  331. def test_invalid_rules_type(self):
  332. with pytest.raises(TypeError) as excinfo:
  333. make_credential_access_boundary(["invalid"])
  334. assert excinfo.match(
  335. "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
  336. )
  337. def test_invalid_rules_value(self):
  338. availability_condition = make_availability_condition(
  339. EXPRESSION, TITLE, DESCRIPTION
  340. )
  341. access_boundary_rule = make_access_boundary_rule(
  342. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  343. )
  344. too_many_rules = [access_boundary_rule] * 11
  345. with pytest.raises(ValueError) as excinfo:
  346. make_credential_access_boundary(too_many_rules)
  347. assert excinfo.match(
  348. "Credential access boundary rules can have a maximum of 10 rules."
  349. )
  350. def test_to_json(self):
  351. availability_condition = make_availability_condition(
  352. EXPRESSION, TITLE, DESCRIPTION
  353. )
  354. access_boundary_rule = make_access_boundary_rule(
  355. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  356. )
  357. rules = [access_boundary_rule]
  358. credential_access_boundary = make_credential_access_boundary(rules)
  359. assert credential_access_boundary.to_json() == {
  360. "accessBoundary": {
  361. "accessBoundaryRules": [
  362. {
  363. "availablePermissions": AVAILABLE_PERMISSIONS,
  364. "availableResource": AVAILABLE_RESOURCE,
  365. "availabilityCondition": {
  366. "expression": EXPRESSION,
  367. "title": TITLE,
  368. "description": DESCRIPTION,
  369. },
  370. }
  371. ]
  372. }
  373. }
  374. class TestCredentials(object):
  375. @staticmethod
  376. def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None):
  377. availability_condition = make_availability_condition(
  378. EXPRESSION, TITLE, DESCRIPTION
  379. )
  380. access_boundary_rule = make_access_boundary_rule(
  381. AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
  382. )
  383. rules = [access_boundary_rule]
  384. credential_access_boundary = make_credential_access_boundary(rules)
  385. return downscoped.Credentials(
  386. source_credentials, credential_access_boundary, quota_project_id
  387. )
  388. @staticmethod
  389. def make_mock_request(data, status=http_client.OK):
  390. response = mock.create_autospec(transport.Response, instance=True)
  391. response.status = status
  392. response.data = json.dumps(data).encode("utf-8")
  393. request = mock.create_autospec(transport.Request)
  394. request.return_value = response
  395. return request
  396. @staticmethod
  397. def assert_request_kwargs(request_kwargs, headers, request_data):
  398. """Asserts the request was called with the expected parameters.
  399. """
  400. assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT
  401. assert request_kwargs["method"] == "POST"
  402. assert request_kwargs["headers"] == headers
  403. assert request_kwargs["body"] is not None
  404. body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
  405. for (k, v) in body_tuples:
  406. assert v.decode("utf-8") == request_data[k.decode("utf-8")]
  407. assert len(body_tuples) == len(request_data.keys())
  408. def test_default_state(self):
  409. credentials = self.make_credentials()
  410. # No token acquired yet.
  411. assert not credentials.token
  412. assert not credentials.valid
  413. # Expiration hasn't been set yet.
  414. assert not credentials.expiry
  415. assert not credentials.expired
  416. # No quota project ID set.
  417. assert not credentials.quota_project_id
  418. def test_with_quota_project(self):
  419. credentials = self.make_credentials()
  420. assert not credentials.quota_project_id
  421. quota_project_creds = credentials.with_quota_project("project-foo")
  422. assert quota_project_creds.quota_project_id == "project-foo"
  423. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  424. def test_refresh(self, unused_utcnow):
  425. response = SUCCESS_RESPONSE.copy()
  426. # Test custom expiration to confirm expiry is set correctly.
  427. response["expires_in"] = 2800
  428. expected_expiry = datetime.datetime.min + datetime.timedelta(
  429. seconds=response["expires_in"]
  430. )
  431. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  432. request_data = {
  433. "grant_type": GRANT_TYPE,
  434. "subject_token": "ACCESS_TOKEN_1",
  435. "subject_token_type": SUBJECT_TOKEN_TYPE,
  436. "requested_token_type": REQUESTED_TOKEN_TYPE,
  437. "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
  438. }
  439. request = self.make_mock_request(status=http_client.OK, data=response)
  440. source_credentials = SourceCredentials()
  441. credentials = self.make_credentials(source_credentials=source_credentials)
  442. # Spy on calls to source credentials refresh to confirm the expected request
  443. # instance is used.
  444. with mock.patch.object(
  445. source_credentials, "refresh", wraps=source_credentials.refresh
  446. ) as wrapped_souce_cred_refresh:
  447. credentials.refresh(request)
  448. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  449. assert credentials.valid
  450. assert credentials.expiry == expected_expiry
  451. assert not credentials.expired
  452. assert credentials.token == response["access_token"]
  453. # Confirm source credentials called with the same request instance.
  454. wrapped_souce_cred_refresh.assert_called_with(request)
  455. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  456. def test_refresh_without_response_expires_in(self, unused_utcnow):
  457. response = SUCCESS_RESPONSE.copy()
  458. # Simulate the response is missing the expires_in field.
  459. # The downscoped token expiration should match the source credentials
  460. # expiration.
  461. del response["expires_in"]
  462. expected_expires_in = 1800
  463. # Simulate the source credentials generates a token with 1800 second
  464. # expiration time. The generated downscoped token should have the same
  465. # expiration time.
  466. source_credentials = SourceCredentials(expires_in=expected_expires_in)
  467. expected_expiry = datetime.datetime.min + datetime.timedelta(
  468. seconds=expected_expires_in
  469. )
  470. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  471. request_data = {
  472. "grant_type": GRANT_TYPE,
  473. "subject_token": "ACCESS_TOKEN_1",
  474. "subject_token_type": SUBJECT_TOKEN_TYPE,
  475. "requested_token_type": REQUESTED_TOKEN_TYPE,
  476. "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
  477. }
  478. request = self.make_mock_request(status=http_client.OK, data=response)
  479. credentials = self.make_credentials(source_credentials=source_credentials)
  480. # Spy on calls to source credentials refresh to confirm the expected request
  481. # instance is used.
  482. with mock.patch.object(
  483. source_credentials, "refresh", wraps=source_credentials.refresh
  484. ) as wrapped_souce_cred_refresh:
  485. credentials.refresh(request)
  486. self.assert_request_kwargs(request.call_args[1], headers, request_data)
  487. assert credentials.valid
  488. assert credentials.expiry == expected_expiry
  489. assert not credentials.expired
  490. assert credentials.token == response["access_token"]
  491. # Confirm source credentials called with the same request instance.
  492. wrapped_souce_cred_refresh.assert_called_with(request)
  493. def test_refresh_token_exchange_error(self):
  494. request = self.make_mock_request(
  495. status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
  496. )
  497. credentials = self.make_credentials()
  498. with pytest.raises(exceptions.OAuthError) as excinfo:
  499. credentials.refresh(request)
  500. assert excinfo.match(
  501. r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
  502. )
  503. assert not credentials.expired
  504. assert credentials.token is None
  505. def test_refresh_source_credentials_refresh_error(self):
  506. # Initialize downscoped credentials with source credentials that raise
  507. # an error on refresh.
  508. credentials = self.make_credentials(
  509. source_credentials=SourceCredentials(raise_error=True)
  510. )
  511. with pytest.raises(exceptions.RefreshError) as excinfo:
  512. credentials.refresh(mock.sentinel.request)
  513. assert excinfo.match(r"Failed to refresh access token in source credentials.")
  514. assert not credentials.expired
  515. assert credentials.token is None
  516. def test_apply_without_quota_project_id(self):
  517. headers = {}
  518. request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
  519. credentials = self.make_credentials()
  520. credentials.refresh(request)
  521. credentials.apply(headers)
  522. assert headers == {
  523. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
  524. }
  525. def test_apply_with_quota_project_id(self):
  526. headers = {"other": "header-value"}
  527. request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
  528. credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)
  529. credentials.refresh(request)
  530. credentials.apply(headers)
  531. assert headers == {
  532. "other": "header-value",
  533. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
  534. "x-goog-user-project": QUOTA_PROJECT_ID,
  535. }
  536. def test_before_request(self):
  537. headers = {"other": "header-value"}
  538. request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
  539. credentials = self.make_credentials()
  540. # First call should call refresh, setting the token.
  541. credentials.before_request(request, "POST", "https://example.com/api", headers)
  542. assert headers == {
  543. "other": "header-value",
  544. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
  545. }
  546. # Second call shouldn't call refresh (request should be untouched).
  547. credentials.before_request(
  548. mock.sentinel.request, "POST", "https://example.com/api", headers
  549. )
  550. assert headers == {
  551. "other": "header-value",
  552. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
  553. }
  554. @mock.patch("google.auth._helpers.utcnow")
  555. def test_before_request_expired(self, utcnow):
  556. headers = {}
  557. request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
  558. credentials = self.make_credentials()
  559. credentials.token = "token"
  560. utcnow.return_value = datetime.datetime.min
  561. # Set the expiration to one second more than now plus the clock skew
  562. # accommodation. These credentials should be valid.
  563. credentials.expiry = (
  564. datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
  565. )
  566. assert credentials.valid
  567. assert not credentials.expired
  568. credentials.before_request(request, "POST", "https://example.com/api", headers)
  569. # Cached token should be used.
  570. assert headers == {"authorization": "Bearer token"}
  571. # Next call should simulate 1 second passed.
  572. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
  573. assert not credentials.valid
  574. assert credentials.expired
  575. credentials.before_request(request, "POST", "https://example.com/api", headers)
  576. # New token should be retrieved.
  577. assert headers == {
  578. "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
  579. }