test_credentials.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876
  1. # Copyright 2016 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import json
  16. import os
  17. import pickle
  18. import sys
  19. import mock
  20. import pytest
  21. from google.auth import _helpers
  22. from google.auth import exceptions
  23. from google.auth import transport
  24. from google.oauth2 import credentials
  25. import yatest.common
  26. DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
  27. AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json")
  28. with open(AUTH_USER_JSON_FILE, "r") as fh:
  29. AUTH_USER_INFO = json.load(fh)
  30. class TestCredentials(object):
  31. TOKEN_URI = "https://example.com/oauth2/token"
  32. REFRESH_TOKEN = "refresh_token"
  33. RAPT_TOKEN = "rapt_token"
  34. CLIENT_ID = "client_id"
  35. CLIENT_SECRET = "client_secret"
  36. @classmethod
  37. def make_credentials(cls):
  38. return credentials.Credentials(
  39. token=None,
  40. refresh_token=cls.REFRESH_TOKEN,
  41. token_uri=cls.TOKEN_URI,
  42. client_id=cls.CLIENT_ID,
  43. client_secret=cls.CLIENT_SECRET,
  44. rapt_token=cls.RAPT_TOKEN,
  45. )
  46. def test_default_state(self):
  47. credentials = self.make_credentials()
  48. assert not credentials.valid
  49. # Expiration hasn't been set yet
  50. assert not credentials.expired
  51. # Scopes aren't required for these credentials
  52. assert not credentials.requires_scopes
  53. # Test properties
  54. assert credentials.refresh_token == self.REFRESH_TOKEN
  55. assert credentials.token_uri == self.TOKEN_URI
  56. assert credentials.client_id == self.CLIENT_ID
  57. assert credentials.client_secret == self.CLIENT_SECRET
  58. assert credentials.rapt_token == self.RAPT_TOKEN
  59. assert credentials.refresh_handler is None
  60. def test_refresh_handler_setter_and_getter(self):
  61. scopes = ["email", "profile"]
  62. original_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_1", None))
  63. updated_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_2", None))
  64. creds = credentials.Credentials(
  65. token=None,
  66. refresh_token=None,
  67. token_uri=None,
  68. client_id=None,
  69. client_secret=None,
  70. rapt_token=None,
  71. scopes=scopes,
  72. default_scopes=None,
  73. refresh_handler=original_refresh_handler,
  74. )
  75. assert creds.refresh_handler is original_refresh_handler
  76. creds.refresh_handler = updated_refresh_handler
  77. assert creds.refresh_handler is updated_refresh_handler
  78. creds.refresh_handler = None
  79. assert creds.refresh_handler is None
  80. def test_invalid_refresh_handler(self):
  81. scopes = ["email", "profile"]
  82. with pytest.raises(TypeError) as excinfo:
  83. credentials.Credentials(
  84. token=None,
  85. refresh_token=None,
  86. token_uri=None,
  87. client_id=None,
  88. client_secret=None,
  89. rapt_token=None,
  90. scopes=scopes,
  91. default_scopes=None,
  92. refresh_handler=object(),
  93. )
  94. assert excinfo.match("The provided refresh_handler is not a callable or None.")
  95. @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
  96. @mock.patch(
  97. "google.auth._helpers.utcnow",
  98. return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
  99. )
  100. def test_refresh_success(self, unused_utcnow, refresh_grant):
  101. token = "token"
  102. new_rapt_token = "new_rapt_token"
  103. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  104. grant_response = {"id_token": mock.sentinel.id_token}
  105. refresh_grant.return_value = (
  106. # Access token
  107. token,
  108. # New refresh token
  109. None,
  110. # Expiry,
  111. expiry,
  112. # Extra data
  113. grant_response,
  114. # rapt_token
  115. new_rapt_token,
  116. )
  117. request = mock.create_autospec(transport.Request)
  118. credentials = self.make_credentials()
  119. # Refresh credentials
  120. credentials.refresh(request)
  121. # Check jwt grant call.
  122. refresh_grant.assert_called_with(
  123. request,
  124. self.TOKEN_URI,
  125. self.REFRESH_TOKEN,
  126. self.CLIENT_ID,
  127. self.CLIENT_SECRET,
  128. None,
  129. self.RAPT_TOKEN,
  130. )
  131. # Check that the credentials have the token and expiry
  132. assert credentials.token == token
  133. assert credentials.expiry == expiry
  134. assert credentials.id_token == mock.sentinel.id_token
  135. assert credentials.rapt_token == new_rapt_token
  136. # Check that the credentials are valid (have a token and are not
  137. # expired)
  138. assert credentials.valid
  139. def test_refresh_no_refresh_token(self):
  140. request = mock.create_autospec(transport.Request)
  141. credentials_ = credentials.Credentials(token=None, refresh_token=None)
  142. with pytest.raises(exceptions.RefreshError, match="necessary fields"):
  143. credentials_.refresh(request)
  144. request.assert_not_called()
  145. @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
  146. @mock.patch(
  147. "google.auth._helpers.utcnow",
  148. return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
  149. )
  150. def test_refresh_with_refresh_token_and_refresh_handler(
  151. self, unused_utcnow, refresh_grant
  152. ):
  153. token = "token"
  154. new_rapt_token = "new_rapt_token"
  155. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  156. grant_response = {"id_token": mock.sentinel.id_token}
  157. refresh_grant.return_value = (
  158. # Access token
  159. token,
  160. # New refresh token
  161. None,
  162. # Expiry,
  163. expiry,
  164. # Extra data
  165. grant_response,
  166. # rapt_token
  167. new_rapt_token,
  168. )
  169. refresh_handler = mock.Mock()
  170. request = mock.create_autospec(transport.Request)
  171. creds = credentials.Credentials(
  172. token=None,
  173. refresh_token=self.REFRESH_TOKEN,
  174. token_uri=self.TOKEN_URI,
  175. client_id=self.CLIENT_ID,
  176. client_secret=self.CLIENT_SECRET,
  177. rapt_token=self.RAPT_TOKEN,
  178. refresh_handler=refresh_handler,
  179. )
  180. # Refresh credentials
  181. creds.refresh(request)
  182. # Check jwt grant call.
  183. refresh_grant.assert_called_with(
  184. request,
  185. self.TOKEN_URI,
  186. self.REFRESH_TOKEN,
  187. self.CLIENT_ID,
  188. self.CLIENT_SECRET,
  189. None,
  190. self.RAPT_TOKEN,
  191. )
  192. # Check that the credentials have the token and expiry
  193. assert creds.token == token
  194. assert creds.expiry == expiry
  195. assert creds.id_token == mock.sentinel.id_token
  196. assert creds.rapt_token == new_rapt_token
  197. # Check that the credentials are valid (have a token and are not
  198. # expired)
  199. assert creds.valid
  200. # Assert refresh handler not called as the refresh token has
  201. # higher priority.
  202. refresh_handler.assert_not_called()
  203. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  204. def test_refresh_with_refresh_handler_success_scopes(self, unused_utcnow):
  205. expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
  206. refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry))
  207. scopes = ["email", "profile"]
  208. default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
  209. request = mock.create_autospec(transport.Request)
  210. creds = credentials.Credentials(
  211. token=None,
  212. refresh_token=None,
  213. token_uri=None,
  214. client_id=None,
  215. client_secret=None,
  216. rapt_token=None,
  217. scopes=scopes,
  218. default_scopes=default_scopes,
  219. refresh_handler=refresh_handler,
  220. )
  221. creds.refresh(request)
  222. assert creds.token == "ACCESS_TOKEN"
  223. assert creds.expiry == expected_expiry
  224. assert creds.valid
  225. assert not creds.expired
  226. # Confirm refresh handler called with the expected arguments.
  227. refresh_handler.assert_called_with(request, scopes=scopes)
  228. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  229. def test_refresh_with_refresh_handler_success_default_scopes(self, unused_utcnow):
  230. expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
  231. original_refresh_handler = mock.Mock(
  232. return_value=("UNUSED_TOKEN", expected_expiry)
  233. )
  234. refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry))
  235. default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
  236. request = mock.create_autospec(transport.Request)
  237. creds = credentials.Credentials(
  238. token=None,
  239. refresh_token=None,
  240. token_uri=None,
  241. client_id=None,
  242. client_secret=None,
  243. rapt_token=None,
  244. scopes=None,
  245. default_scopes=default_scopes,
  246. refresh_handler=original_refresh_handler,
  247. )
  248. # Test newly set refresh_handler is used instead of the original one.
  249. creds.refresh_handler = refresh_handler
  250. creds.refresh(request)
  251. assert creds.token == "ACCESS_TOKEN"
  252. assert creds.expiry == expected_expiry
  253. assert creds.valid
  254. assert not creds.expired
  255. # default_scopes should be used since no developer provided scopes
  256. # are provided.
  257. refresh_handler.assert_called_with(request, scopes=default_scopes)
  258. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  259. def test_refresh_with_refresh_handler_invalid_token(self, unused_utcnow):
  260. expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
  261. # Simulate refresh handler does not return a valid token.
  262. refresh_handler = mock.Mock(return_value=(None, expected_expiry))
  263. scopes = ["email", "profile"]
  264. default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
  265. request = mock.create_autospec(transport.Request)
  266. creds = credentials.Credentials(
  267. token=None,
  268. refresh_token=None,
  269. token_uri=None,
  270. client_id=None,
  271. client_secret=None,
  272. rapt_token=None,
  273. scopes=scopes,
  274. default_scopes=default_scopes,
  275. refresh_handler=refresh_handler,
  276. )
  277. with pytest.raises(
  278. exceptions.RefreshError, match="returned token is not a string"
  279. ):
  280. creds.refresh(request)
  281. assert creds.token is None
  282. assert creds.expiry is None
  283. assert not creds.valid
  284. # Confirm refresh handler called with the expected arguments.
  285. refresh_handler.assert_called_with(request, scopes=scopes)
  286. def test_refresh_with_refresh_handler_invalid_expiry(self):
  287. # Simulate refresh handler returns expiration time in an invalid unit.
  288. refresh_handler = mock.Mock(return_value=("TOKEN", 2800))
  289. scopes = ["email", "profile"]
  290. default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
  291. request = mock.create_autospec(transport.Request)
  292. creds = credentials.Credentials(
  293. token=None,
  294. refresh_token=None,
  295. token_uri=None,
  296. client_id=None,
  297. client_secret=None,
  298. rapt_token=None,
  299. scopes=scopes,
  300. default_scopes=default_scopes,
  301. refresh_handler=refresh_handler,
  302. )
  303. with pytest.raises(
  304. exceptions.RefreshError, match="returned expiry is not a datetime object"
  305. ):
  306. creds.refresh(request)
  307. assert creds.token is None
  308. assert creds.expiry is None
  309. assert not creds.valid
  310. # Confirm refresh handler called with the expected arguments.
  311. refresh_handler.assert_called_with(request, scopes=scopes)
  312. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  313. def test_refresh_with_refresh_handler_expired_token(self, unused_utcnow):
  314. expected_expiry = datetime.datetime.min + _helpers.CLOCK_SKEW
  315. # Simulate refresh handler returns an expired token.
  316. refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
  317. scopes = ["email", "profile"]
  318. default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
  319. request = mock.create_autospec(transport.Request)
  320. creds = credentials.Credentials(
  321. token=None,
  322. refresh_token=None,
  323. token_uri=None,
  324. client_id=None,
  325. client_secret=None,
  326. rapt_token=None,
  327. scopes=scopes,
  328. default_scopes=default_scopes,
  329. refresh_handler=refresh_handler,
  330. )
  331. with pytest.raises(exceptions.RefreshError, match="already expired"):
  332. creds.refresh(request)
  333. assert creds.token is None
  334. assert creds.expiry is None
  335. assert not creds.valid
  336. # Confirm refresh handler called with the expected arguments.
  337. refresh_handler.assert_called_with(request, scopes=scopes)
  338. @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
  339. @mock.patch(
  340. "google.auth._helpers.utcnow",
  341. return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
  342. )
  343. def test_credentials_with_scopes_requested_refresh_success(
  344. self, unused_utcnow, refresh_grant
  345. ):
  346. scopes = ["email", "profile"]
  347. default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
  348. token = "token"
  349. new_rapt_token = "new_rapt_token"
  350. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  351. grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"}
  352. refresh_grant.return_value = (
  353. # Access token
  354. token,
  355. # New refresh token
  356. None,
  357. # Expiry,
  358. expiry,
  359. # Extra data
  360. grant_response,
  361. # rapt token
  362. new_rapt_token,
  363. )
  364. request = mock.create_autospec(transport.Request)
  365. creds = credentials.Credentials(
  366. token=None,
  367. refresh_token=self.REFRESH_TOKEN,
  368. token_uri=self.TOKEN_URI,
  369. client_id=self.CLIENT_ID,
  370. client_secret=self.CLIENT_SECRET,
  371. scopes=scopes,
  372. default_scopes=default_scopes,
  373. rapt_token=self.RAPT_TOKEN,
  374. )
  375. # Refresh credentials
  376. creds.refresh(request)
  377. # Check jwt grant call.
  378. refresh_grant.assert_called_with(
  379. request,
  380. self.TOKEN_URI,
  381. self.REFRESH_TOKEN,
  382. self.CLIENT_ID,
  383. self.CLIENT_SECRET,
  384. scopes,
  385. self.RAPT_TOKEN,
  386. )
  387. # Check that the credentials have the token and expiry
  388. assert creds.token == token
  389. assert creds.expiry == expiry
  390. assert creds.id_token == mock.sentinel.id_token
  391. assert creds.has_scopes(scopes)
  392. assert creds.rapt_token == new_rapt_token
  393. # Check that the credentials are valid (have a token and are not
  394. # expired.)
  395. assert creds.valid
  396. @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
  397. @mock.patch(
  398. "google.auth._helpers.utcnow",
  399. return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
  400. )
  401. def test_credentials_with_only_default_scopes_requested(
  402. self, unused_utcnow, refresh_grant
  403. ):
  404. default_scopes = ["email", "profile"]
  405. token = "token"
  406. new_rapt_token = "new_rapt_token"
  407. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  408. grant_response = {"id_token": mock.sentinel.id_token}
  409. refresh_grant.return_value = (
  410. # Access token
  411. token,
  412. # New refresh token
  413. None,
  414. # Expiry,
  415. expiry,
  416. # Extra data
  417. grant_response,
  418. # rapt token
  419. new_rapt_token,
  420. )
  421. request = mock.create_autospec(transport.Request)
  422. creds = credentials.Credentials(
  423. token=None,
  424. refresh_token=self.REFRESH_TOKEN,
  425. token_uri=self.TOKEN_URI,
  426. client_id=self.CLIENT_ID,
  427. client_secret=self.CLIENT_SECRET,
  428. default_scopes=default_scopes,
  429. rapt_token=self.RAPT_TOKEN,
  430. )
  431. # Refresh credentials
  432. creds.refresh(request)
  433. # Check jwt grant call.
  434. refresh_grant.assert_called_with(
  435. request,
  436. self.TOKEN_URI,
  437. self.REFRESH_TOKEN,
  438. self.CLIENT_ID,
  439. self.CLIENT_SECRET,
  440. default_scopes,
  441. self.RAPT_TOKEN,
  442. )
  443. # Check that the credentials have the token and expiry
  444. assert creds.token == token
  445. assert creds.expiry == expiry
  446. assert creds.id_token == mock.sentinel.id_token
  447. assert creds.has_scopes(default_scopes)
  448. assert creds.rapt_token == new_rapt_token
  449. # Check that the credentials are valid (have a token and are not
  450. # expired.)
  451. assert creds.valid
  452. @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
  453. @mock.patch(
  454. "google.auth._helpers.utcnow",
  455. return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
  456. )
  457. def test_credentials_with_scopes_returned_refresh_success(
  458. self, unused_utcnow, refresh_grant
  459. ):
  460. scopes = ["email", "profile"]
  461. token = "token"
  462. new_rapt_token = "new_rapt_token"
  463. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  464. grant_response = {
  465. "id_token": mock.sentinel.id_token,
  466. "scopes": " ".join(scopes),
  467. }
  468. refresh_grant.return_value = (
  469. # Access token
  470. token,
  471. # New refresh token
  472. None,
  473. # Expiry,
  474. expiry,
  475. # Extra data
  476. grant_response,
  477. # rapt token
  478. new_rapt_token,
  479. )
  480. request = mock.create_autospec(transport.Request)
  481. creds = credentials.Credentials(
  482. token=None,
  483. refresh_token=self.REFRESH_TOKEN,
  484. token_uri=self.TOKEN_URI,
  485. client_id=self.CLIENT_ID,
  486. client_secret=self.CLIENT_SECRET,
  487. scopes=scopes,
  488. rapt_token=self.RAPT_TOKEN,
  489. )
  490. # Refresh credentials
  491. creds.refresh(request)
  492. # Check jwt grant call.
  493. refresh_grant.assert_called_with(
  494. request,
  495. self.TOKEN_URI,
  496. self.REFRESH_TOKEN,
  497. self.CLIENT_ID,
  498. self.CLIENT_SECRET,
  499. scopes,
  500. self.RAPT_TOKEN,
  501. )
  502. # Check that the credentials have the token and expiry
  503. assert creds.token == token
  504. assert creds.expiry == expiry
  505. assert creds.id_token == mock.sentinel.id_token
  506. assert creds.has_scopes(scopes)
  507. assert creds.rapt_token == new_rapt_token
  508. # Check that the credentials are valid (have a token and are not
  509. # expired.)
  510. assert creds.valid
  511. @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
  512. @mock.patch(
  513. "google.auth._helpers.utcnow",
  514. return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
  515. )
  516. def test_credentials_with_scopes_refresh_failure_raises_refresh_error(
  517. self, unused_utcnow, refresh_grant
  518. ):
  519. scopes = ["email", "profile"]
  520. scopes_returned = ["email"]
  521. token = "token"
  522. new_rapt_token = "new_rapt_token"
  523. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  524. grant_response = {
  525. "id_token": mock.sentinel.id_token,
  526. "scope": " ".join(scopes_returned),
  527. }
  528. refresh_grant.return_value = (
  529. # Access token
  530. token,
  531. # New refresh token
  532. None,
  533. # Expiry,
  534. expiry,
  535. # Extra data
  536. grant_response,
  537. # rapt token
  538. new_rapt_token,
  539. )
  540. request = mock.create_autospec(transport.Request)
  541. creds = credentials.Credentials(
  542. token=None,
  543. refresh_token=self.REFRESH_TOKEN,
  544. token_uri=self.TOKEN_URI,
  545. client_id=self.CLIENT_ID,
  546. client_secret=self.CLIENT_SECRET,
  547. scopes=scopes,
  548. rapt_token=self.RAPT_TOKEN,
  549. )
  550. # Refresh credentials
  551. with pytest.raises(
  552. exceptions.RefreshError, match="Not all requested scopes were granted"
  553. ):
  554. creds.refresh(request)
  555. # Check jwt grant call.
  556. refresh_grant.assert_called_with(
  557. request,
  558. self.TOKEN_URI,
  559. self.REFRESH_TOKEN,
  560. self.CLIENT_ID,
  561. self.CLIENT_SECRET,
  562. scopes,
  563. self.RAPT_TOKEN,
  564. )
  565. # Check that the credentials have the token and expiry
  566. assert creds.token == token
  567. assert creds.expiry == expiry
  568. assert creds.id_token == mock.sentinel.id_token
  569. assert creds.has_scopes(scopes)
  570. assert creds.rapt_token == new_rapt_token
  571. # Check that the credentials are valid (have a token and are not
  572. # expired.)
  573. assert creds.valid
  574. def test_apply_with_quota_project_id(self):
  575. creds = credentials.Credentials(
  576. token="token",
  577. refresh_token=self.REFRESH_TOKEN,
  578. token_uri=self.TOKEN_URI,
  579. client_id=self.CLIENT_ID,
  580. client_secret=self.CLIENT_SECRET,
  581. quota_project_id="quota-project-123",
  582. )
  583. headers = {}
  584. creds.apply(headers)
  585. assert headers["x-goog-user-project"] == "quota-project-123"
  586. assert "token" in headers["authorization"]
  587. def test_apply_with_no_quota_project_id(self):
  588. creds = credentials.Credentials(
  589. token="token",
  590. refresh_token=self.REFRESH_TOKEN,
  591. token_uri=self.TOKEN_URI,
  592. client_id=self.CLIENT_ID,
  593. client_secret=self.CLIENT_SECRET,
  594. )
  595. headers = {}
  596. creds.apply(headers)
  597. assert "x-goog-user-project" not in headers
  598. assert "token" in headers["authorization"]
  599. def test_with_quota_project(self):
  600. creds = credentials.Credentials(
  601. token="token",
  602. refresh_token=self.REFRESH_TOKEN,
  603. token_uri=self.TOKEN_URI,
  604. client_id=self.CLIENT_ID,
  605. client_secret=self.CLIENT_SECRET,
  606. quota_project_id="quota-project-123",
  607. )
  608. new_creds = creds.with_quota_project("new-project-456")
  609. assert new_creds.quota_project_id == "new-project-456"
  610. headers = {}
  611. creds.apply(headers)
  612. assert "x-goog-user-project" in headers
  613. def test_from_authorized_user_info(self):
  614. info = AUTH_USER_INFO.copy()
  615. creds = credentials.Credentials.from_authorized_user_info(info)
  616. assert creds.client_secret == info["client_secret"]
  617. assert creds.client_id == info["client_id"]
  618. assert creds.refresh_token == info["refresh_token"]
  619. assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  620. assert creds.scopes is None
  621. scopes = ["email", "profile"]
  622. creds = credentials.Credentials.from_authorized_user_info(info, scopes)
  623. assert creds.client_secret == info["client_secret"]
  624. assert creds.client_id == info["client_id"]
  625. assert creds.refresh_token == info["refresh_token"]
  626. assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  627. assert creds.scopes == scopes
  628. info["scopes"] = "email" # single non-array scope from file
  629. creds = credentials.Credentials.from_authorized_user_info(info)
  630. assert creds.scopes == [info["scopes"]]
  631. info["scopes"] = ["email", "profile"] # array scope from file
  632. creds = credentials.Credentials.from_authorized_user_info(info)
  633. assert creds.scopes == info["scopes"]
  634. expiry = datetime.datetime(2020, 8, 14, 15, 54, 1)
  635. info["expiry"] = expiry.isoformat() + "Z"
  636. creds = credentials.Credentials.from_authorized_user_info(info)
  637. assert creds.expiry == expiry
  638. assert creds.expired
  639. def test_from_authorized_user_file(self):
  640. info = AUTH_USER_INFO.copy()
  641. creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE)
  642. assert creds.client_secret == info["client_secret"]
  643. assert creds.client_id == info["client_id"]
  644. assert creds.refresh_token == info["refresh_token"]
  645. assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  646. assert creds.scopes is None
  647. scopes = ["email", "profile"]
  648. creds = credentials.Credentials.from_authorized_user_file(
  649. AUTH_USER_JSON_FILE, scopes
  650. )
  651. assert creds.client_secret == info["client_secret"]
  652. assert creds.client_id == info["client_id"]
  653. assert creds.refresh_token == info["refresh_token"]
  654. assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  655. assert creds.scopes == scopes
  656. def test_to_json(self):
  657. info = AUTH_USER_INFO.copy()
  658. expiry = datetime.datetime(2020, 8, 14, 15, 54, 1)
  659. info["expiry"] = expiry.isoformat() + "Z"
  660. creds = credentials.Credentials.from_authorized_user_info(info)
  661. assert creds.expiry == expiry
  662. # Test with no `strip` arg
  663. json_output = creds.to_json()
  664. json_asdict = json.loads(json_output)
  665. assert json_asdict.get("token") == creds.token
  666. assert json_asdict.get("refresh_token") == creds.refresh_token
  667. assert json_asdict.get("token_uri") == creds.token_uri
  668. assert json_asdict.get("client_id") == creds.client_id
  669. assert json_asdict.get("scopes") == creds.scopes
  670. assert json_asdict.get("client_secret") == creds.client_secret
  671. assert json_asdict.get("expiry") == info["expiry"]
  672. # Test with a `strip` arg
  673. json_output = creds.to_json(strip=["client_secret"])
  674. json_asdict = json.loads(json_output)
  675. assert json_asdict.get("token") == creds.token
  676. assert json_asdict.get("refresh_token") == creds.refresh_token
  677. assert json_asdict.get("token_uri") == creds.token_uri
  678. assert json_asdict.get("client_id") == creds.client_id
  679. assert json_asdict.get("scopes") == creds.scopes
  680. assert json_asdict.get("client_secret") is None
  681. # Test with no expiry
  682. creds.expiry = None
  683. json_output = creds.to_json()
  684. json_asdict = json.loads(json_output)
  685. assert json_asdict.get("expiry") is None
  686. def test_pickle_and_unpickle(self):
  687. creds = self.make_credentials()
  688. unpickled = pickle.loads(pickle.dumps(creds))
  689. # make sure attributes aren't lost during pickling
  690. assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort()
  691. for attr in list(creds.__dict__):
  692. assert getattr(creds, attr) == getattr(unpickled, attr)
  693. def test_pickle_and_unpickle_with_refresh_handler(self):
  694. expected_expiry = _helpers.utcnow() + datetime.timedelta(seconds=2800)
  695. refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
  696. creds = credentials.Credentials(
  697. token=None,
  698. refresh_token=None,
  699. token_uri=None,
  700. client_id=None,
  701. client_secret=None,
  702. rapt_token=None,
  703. refresh_handler=refresh_handler,
  704. )
  705. unpickled = pickle.loads(pickle.dumps(creds))
  706. # make sure attributes aren't lost during pickling
  707. assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort()
  708. for attr in list(creds.__dict__):
  709. # For the _refresh_handler property, the unpickled creds should be
  710. # set to None.
  711. if attr == "_refresh_handler":
  712. assert getattr(unpickled, attr) is None
  713. else:
  714. assert getattr(creds, attr) == getattr(unpickled, attr)
  715. def test_pickle_with_missing_attribute(self):
  716. creds = self.make_credentials()
  717. # remove an optional attribute before pickling
  718. # this mimics a pickle created with a previous class definition with
  719. # fewer attributes
  720. del creds.__dict__["_quota_project_id"]
  721. unpickled = pickle.loads(pickle.dumps(creds))
  722. # Attribute should be initialized by `__setstate__`
  723. assert unpickled.quota_project_id is None
  724. # pickles are not compatible across versions
  725. @pytest.mark.skipif(
  726. sys.version_info < (3, 5),
  727. reason="pickle file can only be loaded with Python >= 3.5",
  728. )
  729. def test_unpickle_old_credentials_pickle(self):
  730. # make sure a credentials file pickled with an older
  731. # library version (google-auth==1.5.1) can be unpickled
  732. with open(
  733. os.path.join(DATA_DIR, "old_oauth_credentials_py3.pickle"), "rb"
  734. ) as f:
  735. credentials = pickle.load(f)
  736. assert credentials.quota_project_id is None
  737. class TestUserAccessTokenCredentials(object):
  738. def test_instance(self):
  739. cred = credentials.UserAccessTokenCredentials()
  740. assert cred._account is None
  741. cred = cred.with_account("account")
  742. assert cred._account == "account"
  743. @mock.patch("google.auth._cloud_sdk.get_auth_access_token", autospec=True)
  744. def test_refresh(self, get_auth_access_token):
  745. get_auth_access_token.return_value = "access_token"
  746. cred = credentials.UserAccessTokenCredentials()
  747. cred.refresh(None)
  748. assert cred.token == "access_token"
  749. def test_with_quota_project(self):
  750. cred = credentials.UserAccessTokenCredentials()
  751. quota_project_cred = cred.with_quota_project("project-foo")
  752. assert quota_project_cred._quota_project_id == "project-foo"
  753. assert quota_project_cred._account == cred._account
  754. @mock.patch(
  755. "google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True
  756. )
  757. @mock.patch(
  758. "google.oauth2.credentials.UserAccessTokenCredentials.refresh", autospec=True
  759. )
  760. def test_before_request(self, refresh, apply):
  761. cred = credentials.UserAccessTokenCredentials()
  762. cred.before_request(mock.Mock(), "GET", "https://example.com", {})
  763. refresh.assert_called()
  764. apply.assert_called()