test_service_account.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. # Copyright 2016 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import json
  16. import os
  17. import mock
  18. import pytest # type: ignore
  19. from google.auth import _helpers
  20. from google.auth import crypt
  21. from google.auth import exceptions
  22. from google.auth import jwt
  23. from google.auth import transport
  24. from google.oauth2 import service_account
  25. import yatest.common as yc
  26. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
  27. with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
  28. PRIVATE_KEY_BYTES = fh.read()
  29. with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
  30. PUBLIC_CERT_BYTES = fh.read()
  31. with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
  32. OTHER_CERT_BYTES = fh.read()
  33. SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
  34. SERVICE_ACCOUNT_NON_GDU_JSON_FILE = os.path.join(
  35. DATA_DIR, "service_account_non_gdu.json"
  36. )
  37. FAKE_UNIVERSE_DOMAIN = "universe.foo"
  38. with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
  39. SERVICE_ACCOUNT_INFO = json.load(fh)
  40. with open(SERVICE_ACCOUNT_NON_GDU_JSON_FILE, "rb") as fh:
  41. SERVICE_ACCOUNT_INFO_NON_GDU = json.load(fh)
  42. SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
  43. class TestCredentials(object):
  44. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  45. TOKEN_URI = "https://example.com/oauth2/token"
  46. @classmethod
  47. def make_credentials(cls, universe_domain=service_account._DEFAULT_UNIVERSE_DOMAIN):
  48. return service_account.Credentials(
  49. SIGNER,
  50. cls.SERVICE_ACCOUNT_EMAIL,
  51. cls.TOKEN_URI,
  52. universe_domain=universe_domain,
  53. )
  54. def test_constructor_no_universe_domain(self):
  55. credentials = service_account.Credentials(
  56. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, universe_domain=None
  57. )
  58. assert credentials.universe_domain == service_account._DEFAULT_UNIVERSE_DOMAIN
  59. def test_from_service_account_info(self):
  60. credentials = service_account.Credentials.from_service_account_info(
  61. SERVICE_ACCOUNT_INFO
  62. )
  63. assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
  64. assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
  65. assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
  66. assert credentials._universe_domain == service_account._DEFAULT_UNIVERSE_DOMAIN
  67. assert not credentials._always_use_jwt_access
  68. def test_from_service_account_info_non_gdu(self):
  69. credentials = service_account.Credentials.from_service_account_info(
  70. SERVICE_ACCOUNT_INFO_NON_GDU
  71. )
  72. assert credentials.universe_domain == FAKE_UNIVERSE_DOMAIN
  73. assert credentials._always_use_jwt_access
  74. def test_from_service_account_info_args(self):
  75. info = SERVICE_ACCOUNT_INFO.copy()
  76. scopes = ["email", "profile"]
  77. subject = "subject"
  78. additional_claims = {"meta": "data"}
  79. credentials = service_account.Credentials.from_service_account_info(
  80. info, scopes=scopes, subject=subject, additional_claims=additional_claims
  81. )
  82. assert credentials.service_account_email == info["client_email"]
  83. assert credentials.project_id == info["project_id"]
  84. assert credentials._signer.key_id == info["private_key_id"]
  85. assert credentials._token_uri == info["token_uri"]
  86. assert credentials._scopes == scopes
  87. assert credentials._subject == subject
  88. assert credentials._additional_claims == additional_claims
  89. assert not credentials._always_use_jwt_access
  90. def test_from_service_account_file(self):
  91. info = SERVICE_ACCOUNT_INFO.copy()
  92. credentials = service_account.Credentials.from_service_account_file(
  93. SERVICE_ACCOUNT_JSON_FILE
  94. )
  95. assert credentials.service_account_email == info["client_email"]
  96. assert credentials.project_id == info["project_id"]
  97. assert credentials._signer.key_id == info["private_key_id"]
  98. assert credentials._token_uri == info["token_uri"]
  99. def test_from_service_account_file_non_gdu(self):
  100. info = SERVICE_ACCOUNT_INFO_NON_GDU.copy()
  101. credentials = service_account.Credentials.from_service_account_file(
  102. SERVICE_ACCOUNT_NON_GDU_JSON_FILE
  103. )
  104. assert credentials.service_account_email == info["client_email"]
  105. assert credentials.project_id == info["project_id"]
  106. assert credentials._signer.key_id == info["private_key_id"]
  107. assert credentials._token_uri == info["token_uri"]
  108. assert credentials._universe_domain == FAKE_UNIVERSE_DOMAIN
  109. assert credentials._always_use_jwt_access
  110. def test_from_service_account_file_args(self):
  111. info = SERVICE_ACCOUNT_INFO.copy()
  112. scopes = ["email", "profile"]
  113. subject = "subject"
  114. additional_claims = {"meta": "data"}
  115. credentials = service_account.Credentials.from_service_account_file(
  116. SERVICE_ACCOUNT_JSON_FILE,
  117. subject=subject,
  118. scopes=scopes,
  119. additional_claims=additional_claims,
  120. )
  121. assert credentials.service_account_email == info["client_email"]
  122. assert credentials.project_id == info["project_id"]
  123. assert credentials._signer.key_id == info["private_key_id"]
  124. assert credentials._token_uri == info["token_uri"]
  125. assert credentials._scopes == scopes
  126. assert credentials._subject == subject
  127. assert credentials._additional_claims == additional_claims
  128. def test_default_state(self):
  129. credentials = self.make_credentials()
  130. assert not credentials.valid
  131. # Expiration hasn't been set yet
  132. assert not credentials.expired
  133. # Scopes haven't been specified yet
  134. assert credentials.requires_scopes
  135. def test_sign_bytes(self):
  136. credentials = self.make_credentials()
  137. to_sign = b"123"
  138. signature = credentials.sign_bytes(to_sign)
  139. assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
  140. def test_signer(self):
  141. credentials = self.make_credentials()
  142. assert isinstance(credentials.signer, crypt.Signer)
  143. def test_signer_email(self):
  144. credentials = self.make_credentials()
  145. assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
  146. def test_create_scoped(self):
  147. credentials = self.make_credentials()
  148. scopes = ["email", "profile"]
  149. credentials = credentials.with_scopes(scopes)
  150. assert credentials._scopes == scopes
  151. def test_with_claims(self):
  152. credentials = self.make_credentials()
  153. new_credentials = credentials.with_claims({"meep": "moop"})
  154. assert new_credentials._additional_claims == {"meep": "moop"}
  155. def test_with_quota_project(self):
  156. credentials = self.make_credentials()
  157. new_credentials = credentials.with_quota_project("new-project-456")
  158. assert new_credentials.quota_project_id == "new-project-456"
  159. hdrs = {}
  160. new_credentials.apply(hdrs, token="tok")
  161. assert "x-goog-user-project" in hdrs
  162. def test_with_token_uri(self):
  163. credentials = self.make_credentials()
  164. new_token_uri = "https://example2.com/oauth2/token"
  165. assert credentials._token_uri == self.TOKEN_URI
  166. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  167. assert creds_with_new_token_uri._token_uri == new_token_uri
  168. def test_with_universe_domain(self):
  169. credentials = self.make_credentials()
  170. new_credentials = credentials.with_universe_domain("dummy_universe.com")
  171. assert new_credentials.universe_domain == "dummy_universe.com"
  172. assert new_credentials._always_use_jwt_access
  173. new_credentials = credentials.with_universe_domain("googleapis.com")
  174. assert new_credentials.universe_domain == "googleapis.com"
  175. assert not new_credentials._always_use_jwt_access
  176. def test__with_always_use_jwt_access(self):
  177. credentials = self.make_credentials()
  178. assert not credentials._always_use_jwt_access
  179. new_credentials = credentials.with_always_use_jwt_access(True)
  180. assert new_credentials._always_use_jwt_access
  181. def test__with_always_use_jwt_access_non_default_universe_domain(self):
  182. credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
  183. with pytest.raises(exceptions.InvalidValue) as excinfo:
  184. credentials.with_always_use_jwt_access(False)
  185. assert excinfo.match(
  186. "always_use_jwt_access should be True for non-default universe domain"
  187. )
  188. def test__make_authorization_grant_assertion(self):
  189. credentials = self.make_credentials()
  190. token = credentials._make_authorization_grant_assertion()
  191. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  192. assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
  193. assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  194. def test__make_authorization_grant_assertion_scoped(self):
  195. credentials = self.make_credentials()
  196. scopes = ["email", "profile"]
  197. credentials = credentials.with_scopes(scopes)
  198. token = credentials._make_authorization_grant_assertion()
  199. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  200. assert payload["scope"] == "email profile"
  201. def test__make_authorization_grant_assertion_subject(self):
  202. credentials = self.make_credentials()
  203. subject = "user@example.com"
  204. credentials = credentials.with_subject(subject)
  205. token = credentials._make_authorization_grant_assertion()
  206. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  207. assert payload["sub"] == subject
  208. def test_apply_with_quota_project_id(self):
  209. credentials = service_account.Credentials(
  210. SIGNER,
  211. self.SERVICE_ACCOUNT_EMAIL,
  212. self.TOKEN_URI,
  213. quota_project_id="quota-project-123",
  214. )
  215. headers = {}
  216. credentials.apply(headers, token="token")
  217. assert headers["x-goog-user-project"] == "quota-project-123"
  218. assert "token" in headers["authorization"]
  219. def test_apply_with_no_quota_project_id(self):
  220. credentials = service_account.Credentials(
  221. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
  222. )
  223. headers = {}
  224. credentials.apply(headers, token="token")
  225. assert "x-goog-user-project" not in headers
  226. assert "token" in headers["authorization"]
  227. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  228. def test__create_self_signed_jwt(self, jwt):
  229. credentials = service_account.Credentials(
  230. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
  231. )
  232. audience = "https://pubsub.googleapis.com"
  233. credentials._create_self_signed_jwt(audience)
  234. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  235. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  236. def test__create_self_signed_jwt_with_user_scopes(self, jwt):
  237. credentials = service_account.Credentials(
  238. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
  239. )
  240. audience = "https://pubsub.googleapis.com"
  241. credentials._create_self_signed_jwt(audience)
  242. # JWT should not be created if there are user-defined scopes
  243. jwt.from_signing_credentials.assert_not_called()
  244. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  245. def test__create_self_signed_jwt_always_use_jwt_access_with_audience(self, jwt):
  246. credentials = service_account.Credentials(
  247. SIGNER,
  248. self.SERVICE_ACCOUNT_EMAIL,
  249. self.TOKEN_URI,
  250. default_scopes=["bar", "foo"],
  251. always_use_jwt_access=True,
  252. )
  253. audience = "https://pubsub.googleapis.com"
  254. credentials._create_self_signed_jwt(audience)
  255. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  256. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  257. def test__create_self_signed_jwt_always_use_jwt_access_with_audience_similar_jwt_is_reused(
  258. self, jwt
  259. ):
  260. credentials = service_account.Credentials(
  261. SIGNER,
  262. self.SERVICE_ACCOUNT_EMAIL,
  263. self.TOKEN_URI,
  264. default_scopes=["bar", "foo"],
  265. always_use_jwt_access=True,
  266. )
  267. audience = "https://pubsub.googleapis.com"
  268. credentials._create_self_signed_jwt(audience)
  269. credentials._jwt_credentials._audience = audience
  270. credentials._create_self_signed_jwt(audience)
  271. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  272. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  273. def test__create_self_signed_jwt_always_use_jwt_access_with_scopes(self, jwt):
  274. credentials = service_account.Credentials(
  275. SIGNER,
  276. self.SERVICE_ACCOUNT_EMAIL,
  277. self.TOKEN_URI,
  278. scopes=["bar", "foo"],
  279. always_use_jwt_access=True,
  280. )
  281. audience = "https://pubsub.googleapis.com"
  282. credentials._create_self_signed_jwt(audience)
  283. jwt.from_signing_credentials.assert_called_once_with(
  284. credentials, None, additional_claims={"scope": "bar foo"}
  285. )
  286. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  287. def test__create_self_signed_jwt_always_use_jwt_access_with_scopes_similar_jwt_is_reused(
  288. self, jwt
  289. ):
  290. credentials = service_account.Credentials(
  291. SIGNER,
  292. self.SERVICE_ACCOUNT_EMAIL,
  293. self.TOKEN_URI,
  294. scopes=["bar", "foo"],
  295. always_use_jwt_access=True,
  296. )
  297. audience = "https://pubsub.googleapis.com"
  298. credentials._create_self_signed_jwt(audience)
  299. credentials._jwt_credentials.additional_claims = {"scope": "bar foo"}
  300. credentials._create_self_signed_jwt(audience)
  301. jwt.from_signing_credentials.assert_called_once_with(
  302. credentials, None, additional_claims={"scope": "bar foo"}
  303. )
  304. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  305. def test__create_self_signed_jwt_always_use_jwt_access_with_default_scopes(
  306. self, jwt
  307. ):
  308. credentials = service_account.Credentials(
  309. SIGNER,
  310. self.SERVICE_ACCOUNT_EMAIL,
  311. self.TOKEN_URI,
  312. default_scopes=["bar", "foo"],
  313. always_use_jwt_access=True,
  314. )
  315. credentials._create_self_signed_jwt(None)
  316. jwt.from_signing_credentials.assert_called_once_with(
  317. credentials, None, additional_claims={"scope": "bar foo"}
  318. )
  319. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  320. def test__create_self_signed_jwt_always_use_jwt_access_with_default_scopes_similar_jwt_is_reused(
  321. self, jwt
  322. ):
  323. credentials = service_account.Credentials(
  324. SIGNER,
  325. self.SERVICE_ACCOUNT_EMAIL,
  326. self.TOKEN_URI,
  327. default_scopes=["bar", "foo"],
  328. always_use_jwt_access=True,
  329. )
  330. credentials._create_self_signed_jwt(None)
  331. credentials._jwt_credentials.additional_claims = {"scope": "bar foo"}
  332. credentials._create_self_signed_jwt(None)
  333. jwt.from_signing_credentials.assert_called_once_with(
  334. credentials, None, additional_claims={"scope": "bar foo"}
  335. )
  336. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  337. def test__create_self_signed_jwt_always_use_jwt_access(self, jwt):
  338. credentials = service_account.Credentials(
  339. SIGNER,
  340. self.SERVICE_ACCOUNT_EMAIL,
  341. self.TOKEN_URI,
  342. always_use_jwt_access=True,
  343. )
  344. credentials._create_self_signed_jwt(None)
  345. jwt.from_signing_credentials.assert_not_called()
  346. def test_token_usage_metrics_assertion(self):
  347. credentials = service_account.Credentials(
  348. SIGNER,
  349. self.SERVICE_ACCOUNT_EMAIL,
  350. self.TOKEN_URI,
  351. always_use_jwt_access=False,
  352. )
  353. credentials.token = "token"
  354. credentials.expiry = None
  355. headers = {}
  356. credentials.before_request(mock.Mock(), None, None, headers)
  357. assert headers["authorization"] == "Bearer token"
  358. assert headers["x-goog-api-client"] == "cred-type/sa"
  359. def test_token_usage_metrics_self_signed_jwt(self):
  360. credentials = service_account.Credentials(
  361. SIGNER,
  362. self.SERVICE_ACCOUNT_EMAIL,
  363. self.TOKEN_URI,
  364. always_use_jwt_access=True,
  365. )
  366. credentials._create_self_signed_jwt("foo.googleapis.com")
  367. credentials.token = "token"
  368. credentials.expiry = None
  369. headers = {}
  370. credentials.before_request(mock.Mock(), None, None, headers)
  371. assert headers["authorization"] == "Bearer token"
  372. assert headers["x-goog-api-client"] == "cred-type/jwt"
  373. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  374. def test_refresh_success(self, jwt_grant):
  375. credentials = self.make_credentials()
  376. token = "token"
  377. jwt_grant.return_value = (
  378. token,
  379. _helpers.utcnow() + datetime.timedelta(seconds=500),
  380. {},
  381. )
  382. request = mock.create_autospec(transport.Request, instance=True)
  383. # Refresh credentials
  384. credentials.refresh(request)
  385. # Check jwt grant call.
  386. assert jwt_grant.called
  387. called_request, token_uri, assertion = jwt_grant.call_args[0]
  388. assert called_request == request
  389. assert token_uri == credentials._token_uri
  390. assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
  391. # No further assertion done on the token, as there are separate tests
  392. # for checking the authorization grant assertion.
  393. # Check that the credentials have the token.
  394. assert credentials.token == token
  395. # Check that the credentials are valid (have a token and are not
  396. # expired)
  397. assert credentials.valid
  398. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  399. def test_before_request_refreshes(self, jwt_grant):
  400. credentials = self.make_credentials()
  401. token = "token"
  402. jwt_grant.return_value = (
  403. token,
  404. _helpers.utcnow() + datetime.timedelta(seconds=500),
  405. None,
  406. )
  407. request = mock.create_autospec(transport.Request, instance=True)
  408. # Credentials should start as invalid
  409. assert not credentials.valid
  410. # before_request should cause a refresh
  411. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  412. # The refresh endpoint should've been called.
  413. assert jwt_grant.called
  414. # Credentials should now be valid.
  415. assert credentials.valid
  416. @mock.patch("google.auth.jwt.Credentials._make_jwt")
  417. def test_refresh_with_jwt_credentials(self, make_jwt):
  418. credentials = self.make_credentials()
  419. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  420. request = mock.create_autospec(transport.Request, instance=True)
  421. token = "token"
  422. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  423. make_jwt.return_value = (b"token", expiry)
  424. # Credentials should start as invalid
  425. assert not credentials.valid
  426. # before_request should cause a refresh
  427. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  428. # Credentials should now be valid.
  429. assert credentials.valid
  430. # Assert make_jwt was called
  431. assert make_jwt.call_count == 1
  432. assert credentials.token == token
  433. assert credentials.expiry == expiry
  434. def test_refresh_with_jwt_credentials_token_type_check(self):
  435. credentials = self.make_credentials()
  436. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  437. credentials.refresh(mock.Mock())
  438. # Credentials token should be a JWT string.
  439. assert isinstance(credentials.token, str)
  440. payload = jwt.decode(credentials.token, verify=False)
  441. assert payload["aud"] == "https://pubsub.googleapis.com"
  442. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  443. @mock.patch("google.auth.jwt.Credentials.refresh", autospec=True)
  444. def test_refresh_jwt_not_used_for_domain_wide_delegation(
  445. self, self_signed_jwt_refresh, jwt_grant
  446. ):
  447. # Create a domain wide delegation credentials by setting the subject.
  448. credentials = service_account.Credentials(
  449. SIGNER,
  450. self.SERVICE_ACCOUNT_EMAIL,
  451. self.TOKEN_URI,
  452. always_use_jwt_access=True,
  453. subject="subject",
  454. )
  455. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  456. jwt_grant.return_value = (
  457. "token",
  458. _helpers.utcnow() + datetime.timedelta(seconds=500),
  459. {},
  460. )
  461. request = mock.create_autospec(transport.Request, instance=True)
  462. # Refresh credentials
  463. credentials.refresh(request)
  464. # Make sure we are using jwt_grant and not self signed JWT refresh
  465. # method to obtain the token.
  466. assert jwt_grant.called
  467. assert not self_signed_jwt_refresh.called
  468. def test_refresh_missing_jwt_credentials(self):
  469. credentials = self.make_credentials()
  470. credentials = credentials.with_scopes(["foo", "bar"])
  471. credentials = credentials.with_always_use_jwt_access(True)
  472. assert not credentials._jwt_credentials
  473. credentials.refresh(mock.Mock())
  474. # jwt credentials should have been automatically created with scopes
  475. assert credentials._jwt_credentials is not None
  476. def test_refresh_non_gdu_domain_wide_delegation_not_supported(self):
  477. credentials = self.make_credentials(universe_domain="foo")
  478. credentials._subject = "bar@example.com"
  479. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  480. with pytest.raises(exceptions.RefreshError) as excinfo:
  481. credentials.refresh(None)
  482. assert excinfo.match("domain wide delegation is not supported")
  483. class TestIDTokenCredentials(object):
  484. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  485. TOKEN_URI = "https://example.com/oauth2/token"
  486. TARGET_AUDIENCE = "https://example.com"
  487. @classmethod
  488. def make_credentials(cls, universe_domain=service_account._DEFAULT_UNIVERSE_DOMAIN):
  489. return service_account.IDTokenCredentials(
  490. SIGNER,
  491. cls.SERVICE_ACCOUNT_EMAIL,
  492. cls.TOKEN_URI,
  493. cls.TARGET_AUDIENCE,
  494. universe_domain=universe_domain,
  495. )
  496. def test_constructor_no_universe_domain(self):
  497. credentials = service_account.IDTokenCredentials(
  498. SIGNER,
  499. self.SERVICE_ACCOUNT_EMAIL,
  500. self.TOKEN_URI,
  501. self.TARGET_AUDIENCE,
  502. universe_domain=None,
  503. )
  504. assert credentials._universe_domain == service_account._DEFAULT_UNIVERSE_DOMAIN
  505. def test_from_service_account_info(self):
  506. credentials = service_account.IDTokenCredentials.from_service_account_info(
  507. SERVICE_ACCOUNT_INFO, target_audience=self.TARGET_AUDIENCE
  508. )
  509. assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
  510. assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
  511. assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
  512. assert credentials._target_audience == self.TARGET_AUDIENCE
  513. assert not credentials._use_iam_endpoint
  514. def test_from_service_account_info_non_gdu(self):
  515. credentials = service_account.IDTokenCredentials.from_service_account_info(
  516. SERVICE_ACCOUNT_INFO_NON_GDU, target_audience=self.TARGET_AUDIENCE
  517. )
  518. assert (
  519. credentials._signer.key_id == SERVICE_ACCOUNT_INFO_NON_GDU["private_key_id"]
  520. )
  521. assert (
  522. credentials.service_account_email
  523. == SERVICE_ACCOUNT_INFO_NON_GDU["client_email"]
  524. )
  525. assert credentials._token_uri == SERVICE_ACCOUNT_INFO_NON_GDU["token_uri"]
  526. assert credentials._target_audience == self.TARGET_AUDIENCE
  527. assert credentials._use_iam_endpoint
  528. def test_from_service_account_file(self):
  529. info = SERVICE_ACCOUNT_INFO.copy()
  530. credentials = service_account.IDTokenCredentials.from_service_account_file(
  531. SERVICE_ACCOUNT_JSON_FILE, target_audience=self.TARGET_AUDIENCE
  532. )
  533. assert credentials.service_account_email == info["client_email"]
  534. assert credentials._signer.key_id == info["private_key_id"]
  535. assert credentials._token_uri == info["token_uri"]
  536. assert credentials._target_audience == self.TARGET_AUDIENCE
  537. assert not credentials._use_iam_endpoint
  538. def test_from_service_account_file_non_gdu(self):
  539. info = SERVICE_ACCOUNT_INFO_NON_GDU.copy()
  540. credentials = service_account.IDTokenCredentials.from_service_account_file(
  541. SERVICE_ACCOUNT_NON_GDU_JSON_FILE, target_audience=self.TARGET_AUDIENCE
  542. )
  543. assert credentials.service_account_email == info["client_email"]
  544. assert credentials._signer.key_id == info["private_key_id"]
  545. assert credentials._token_uri == info["token_uri"]
  546. assert credentials._target_audience == self.TARGET_AUDIENCE
  547. assert credentials._use_iam_endpoint
  548. def test_default_state(self):
  549. credentials = self.make_credentials()
  550. assert not credentials.valid
  551. # Expiration hasn't been set yet
  552. assert not credentials.expired
  553. def test_sign_bytes(self):
  554. credentials = self.make_credentials()
  555. to_sign = b"123"
  556. signature = credentials.sign_bytes(to_sign)
  557. assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
  558. def test_signer(self):
  559. credentials = self.make_credentials()
  560. assert isinstance(credentials.signer, crypt.Signer)
  561. def test_signer_email(self):
  562. credentials = self.make_credentials()
  563. assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
  564. def test_with_target_audience(self):
  565. credentials = self.make_credentials()
  566. new_credentials = credentials.with_target_audience("https://new.example.com")
  567. assert new_credentials._target_audience == "https://new.example.com"
  568. def test__with_use_iam_endpoint(self):
  569. credentials = self.make_credentials()
  570. new_credentials = credentials._with_use_iam_endpoint(True)
  571. assert new_credentials._use_iam_endpoint
  572. def test__with_use_iam_endpoint_non_default_universe_domain(self):
  573. credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
  574. with pytest.raises(exceptions.InvalidValue) as excinfo:
  575. credentials._with_use_iam_endpoint(False)
  576. assert excinfo.match(
  577. "use_iam_endpoint should be True for non-default universe domain"
  578. )
  579. def test_with_quota_project(self):
  580. credentials = self.make_credentials()
  581. new_credentials = credentials.with_quota_project("project-foo")
  582. assert new_credentials._quota_project_id == "project-foo"
  583. def test_with_token_uri(self):
  584. credentials = self.make_credentials()
  585. new_token_uri = "https://example2.com/oauth2/token"
  586. assert credentials._token_uri == self.TOKEN_URI
  587. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  588. assert creds_with_new_token_uri._token_uri == new_token_uri
  589. def test__make_authorization_grant_assertion(self):
  590. credentials = self.make_credentials()
  591. token = credentials._make_authorization_grant_assertion()
  592. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  593. assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
  594. assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  595. assert payload["target_audience"] == self.TARGET_AUDIENCE
  596. @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
  597. def test_refresh_success(self, id_token_jwt_grant):
  598. credentials = self.make_credentials()
  599. token = "token"
  600. id_token_jwt_grant.return_value = (
  601. token,
  602. _helpers.utcnow() + datetime.timedelta(seconds=500),
  603. {},
  604. )
  605. request = mock.create_autospec(transport.Request, instance=True)
  606. # Refresh credentials
  607. credentials.refresh(request)
  608. # Check jwt grant call.
  609. assert id_token_jwt_grant.called
  610. called_request, token_uri, assertion = id_token_jwt_grant.call_args[0]
  611. assert called_request == request
  612. assert token_uri == credentials._token_uri
  613. assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
  614. # No further assertion done on the token, as there are separate tests
  615. # for checking the authorization grant assertion.
  616. # Check that the credentials have the token.
  617. assert credentials.token == token
  618. # Check that the credentials are valid (have a token and are not
  619. # expired)
  620. assert credentials.valid
  621. @mock.patch(
  622. "google.oauth2._client.call_iam_generate_id_token_endpoint", autospec=True
  623. )
  624. def test_refresh_iam_flow(self, call_iam_generate_id_token_endpoint):
  625. credentials = self.make_credentials()
  626. credentials._use_iam_endpoint = True
  627. token = "id_token"
  628. call_iam_generate_id_token_endpoint.return_value = (
  629. token,
  630. _helpers.utcnow() + datetime.timedelta(seconds=500),
  631. )
  632. request = mock.Mock()
  633. credentials.refresh(request)
  634. req, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[
  635. 0
  636. ]
  637. assert req == request
  638. assert signer_email == "service-account@example.com"
  639. assert target_audience == "https://example.com"
  640. decoded_access_token = jwt.decode(access_token, verify=False)
  641. assert decoded_access_token["scope"] == "https://www.googleapis.com/auth/iam"
  642. @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
  643. def test_before_request_refreshes(self, id_token_jwt_grant):
  644. credentials = self.make_credentials()
  645. token = "token"
  646. id_token_jwt_grant.return_value = (
  647. token,
  648. _helpers.utcnow() + datetime.timedelta(seconds=500),
  649. None,
  650. )
  651. request = mock.create_autospec(transport.Request, instance=True)
  652. # Credentials should start as invalid
  653. assert not credentials.valid
  654. # before_request should cause a refresh
  655. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  656. # The refresh endpoint should've been called.
  657. assert id_token_jwt_grant.called
  658. # Credentials should now be valid.
  659. assert credentials.valid